You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:27 UTC
[12/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
new file mode 100644
index 0000000..14e67b2
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.FileUtil;
+
+/**
+ * It specifies the maximum size or increasing ratio. In addition,
+ * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
+ * due to ByteBuffer.
+ */
+public class ResizableLimitSpec {
+ private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
+
+ public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
+ public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
+
+ private final long initSize;
+ private final long limitBytes;
+ private final float incRatio;
+ private final float allowedOVerflowRatio;
+ private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
+ private final static float DEFAULT_INCREASE_RATIO = 1.0f;
+
+ public ResizableLimitSpec(long initSize) {
+ this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+ }
+
+ public ResizableLimitSpec(long initSize, long limitBytes) {
+ this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+ }
+
+ public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
+ this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
+ }
+
+ public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
+ Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
+ Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
+ Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
+ Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
+ Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
+
+ if (initSize == limitBytes) {
+ long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
+
+ if (overflowedSize > Integer.MAX_VALUE) {
+ overflowedSize = Integer.MAX_VALUE;
+ }
+
+ this.initSize = overflowedSize;
+ this.limitBytes = overflowedSize;
+ } else {
+ this.initSize = initSize;
+ limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
+
+ if (limitBytes > Integer.MAX_VALUE) {
+ this.limitBytes = Integer.MAX_VALUE;
+ } else {
+ this.limitBytes = limitBytes;
+ }
+ }
+
+ this.allowedOVerflowRatio = allowedOverflowRatio;
+ this.incRatio = incRatio;
+ }
+
+ public long initialSize() {
+ return initSize;
+ }
+
+ public long limit() {
+ return limitBytes;
+ }
+
+ public float remainRatio(long currentSize) {
+ Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+ if (currentSize > Integer.MAX_VALUE) {
+ currentSize = Integer.MAX_VALUE;
+ }
+ return (float)currentSize / (float)limitBytes;
+ }
+
+ public boolean canIncrease(long currentSize) {
+ return remain(currentSize) > 0;
+ }
+
+ public long remain(long currentSize) {
+ Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+ return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
+ }
+
+ public int increasedSize(int currentSize) {
+ if (currentSize < initSize) {
+ return (int) initSize;
+ }
+
+ if (currentSize > Integer.MAX_VALUE) {
+ LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
+ return Integer.MAX_VALUE;
+ }
+ long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
+
+ if (nextSize > limitBytes) {
+ LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
+ nextSize = limitBytes;
+ }
+
+ if (nextSize > Integer.MAX_VALUE) {
+ LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
+ nextSize = Integer.MAX_VALUE;
+ }
+
+ return (int) nextSize;
+ }
+
+ @Override
+ public String toString() {
+ return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
+ + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
+ + ",inc_ratio=" + incRatio;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
new file mode 100644
index 0000000..a2b2561
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
@@ -0,0 +1,73 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+
+/**
+ * The call sequence should be as follows:
+ *
+ * <pre>
+ * startRow() --> skipField() or putXXX --> endRow()
+ * </pre>
+ *
+ * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
+ */
+public interface RowWriter {
+
+ public TajoDataTypes.DataType [] dataTypes();
+
+ public boolean startRow();
+
+ public void endRow();
+
+ public void skipField();
+
+ public void putBool(boolean val);
+
+ public void putInt2(short val);
+
+ public void putInt4(int val);
+
+ public void putInt8(long val);
+
+ public void putFloat4(float val);
+
+ public void putFloat8(double val);
+
+ public void putText(String val);
+
+ public void putText(byte[] val);
+
+ public void putBlob(byte[] val);
+
+ public void putTimestamp(long val);
+
+ public void putTime(long val);
+
+ public void putDate(int val);
+
+ public void putInterval(IntervalDatum val);
+
+ public void putInet4(int val);
+
+ public void putProtoDatum(ProtobufDatum datum);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
new file mode 100644
index 0000000..b742e6d
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -0,0 +1,311 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.UnsafeUtil;
+
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public abstract class UnSafeTuple implements Tuple {
+ private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+ private DirectBuffer bb;
+ private int relativePos;
+ private int length;
+ private DataType [] types;
+
+ protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+ this.bb = (DirectBuffer) bb;
+ this.relativePos = relativePos;
+ this.length = length;
+ this.types = types;
+ }
+
+ void set(ByteBuffer bb, DataType [] types) {
+ set(bb, 0, bb.limit(), types);
+ }
+
+ @Override
+ public int size() {
+ return types.length;
+ }
+
+ public ByteBuffer nioBuffer() {
+ return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
+ }
+
+ public HeapTuple toHeapTuple() {
+ byte [] bytes = new byte[length];
+ UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length);
+ return new HeapTuple(bytes, types);
+ }
+
+ public void copyFrom(UnSafeTuple tuple) {
+ Preconditions.checkNotNull(tuple);
+
+ ((ByteBuffer) bb).clear();
+ if (length < tuple.length) {
+ UnsafeUtil.free((ByteBuffer) bb);
+ bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder());
+ this.relativePos = 0;
+ this.length = tuple.length;
+ }
+
+ ((ByteBuffer) bb).put(tuple.nioBuffer());
+ }
+
+ private int getFieldOffset(int fieldId) {
+ return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+ }
+
+ public long getFieldAddr(int fieldId) {
+ int fieldOffset = getFieldOffset(fieldId);
+ if (fieldOffset == -1) {
+ throw new RuntimeException("Invalid Field Access: " + fieldId);
+ }
+ return bb.address() + relativePos + fieldOffset;
+ }
+
+ @Override
+ public boolean contains(int fieldid) {
+ return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public boolean isNull(int fieldid) {
+ return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public boolean isNotNull(int fieldid) {
+ return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public void clear() {
+ // nothing to do
+ }
+
+ @Override
+ public void put(int fieldId, Datum value) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+ }
+
+ @Override
+ public void put(int fieldId, Datum[] values) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+ }
+
+ @Override
+ public void put(Datum[] values) {
+ throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+ }
+
+ @Override
+ public Datum get(int fieldId) {
+ if (isNull(fieldId)) {
+ return NullDatum.get();
+ }
+
+ switch (types[fieldId].getType()) {
+ case BOOLEAN:
+ return DatumFactory.createBool(getBool(fieldId));
+ case INT1:
+ case INT2:
+ return DatumFactory.createInt2(getInt2(fieldId));
+ case INT4:
+ return DatumFactory.createInt4(getInt4(fieldId));
+ case INT8:
+ return DatumFactory.createInt8(getInt4(fieldId));
+ case FLOAT4:
+ return DatumFactory.createFloat4(getFloat4(fieldId));
+ case FLOAT8:
+ return DatumFactory.createFloat8(getFloat8(fieldId));
+ case TEXT:
+ return DatumFactory.createText(getText(fieldId));
+ case TIMESTAMP:
+ return DatumFactory.createTimestamp(getInt8(fieldId));
+ case DATE:
+ return DatumFactory.createDate(getInt4(fieldId));
+ case TIME:
+ return DatumFactory.createTime(getInt8(fieldId));
+ case INTERVAL:
+ return getInterval(fieldId);
+ case INET4:
+ return DatumFactory.createInet4(getInt4(fieldId));
+ case PROTOBUF:
+ return getProtobufDatum(fieldId);
+ default:
+ throw new UnsupportedException("Unknown type: " + types[fieldId]);
+ }
+ }
+
+ @Override
+ public void setOffset(long offset) {
+ }
+
+ @Override
+ public long getOffset() {
+ return 0;
+ }
+
+ @Override
+ public boolean getBool(int fieldId) {
+ return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01;
+ }
+
+ @Override
+ public byte getByte(int fieldId) {
+ return UNSAFE.getByte(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public char getChar(int fieldId) {
+ return UNSAFE.getChar(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public byte[] getBytes(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int len = UNSAFE.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+ return bytes;
+ }
+
+ @Override
+ public short getInt2(int fieldId) {
+ long addr = getFieldAddr(fieldId);
+ return UNSAFE.getShort(addr);
+ }
+
+ @Override
+ public int getInt4(int fieldId) {
+ return UNSAFE.getInt(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public long getInt8(int fieldId) {
+ return UNSAFE.getLong(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public float getFloat4(int fieldId) {
+ return UNSAFE.getFloat(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public double getFloat8(int fieldId) {
+ return UNSAFE.getDouble(getFieldAddr(fieldId));
+ }
+
+ @Override
+ public String getText(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int len = UNSAFE.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+ return new String(bytes);
+ }
+
+ public IntervalDatum getInterval(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int months = UNSAFE.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+ long millisecs = UNSAFE.getLong(pos);
+ return new IntervalDatum(months, millisecs);
+ }
+
+ @Override
+ public Datum getProtobufDatum(int fieldId) {
+ byte [] bytes = getBytes(fieldId);
+
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+ Message.Builder builder = factory.newBuilder();
+ try {
+ builder.mergeFrom(bytes);
+ } catch (InvalidProtocolBufferException e) {
+ return NullDatum.get();
+ }
+
+ return new ProtobufDatum(builder.build());
+ }
+
+ @Override
+ public char[] getUnicodeChars(int fieldId) {
+ long pos = getFieldAddr(fieldId);
+ int len = UNSAFE.getInt(pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+ return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ return toHeapTuple();
+ }
+
+ @Override
+ public Datum[] getValues() {
+ Datum [] datums = new Datum[size()];
+ for (int i = 0; i < size(); i++) {
+ if (contains(i)) {
+ datums[i] = get(i);
+ } else {
+ datums[i] = NullDatum.get();
+ }
+ }
+ return datums;
+ }
+
+ @Override
+ public String toString() {
+ return VTuple.toDisplayString(getValues());
+ }
+
+ public abstract void release();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
new file mode 100644
index 0000000..73e1e2f
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
@@ -0,0 +1,99 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedLongs;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteOrder;
+
+/**
+ * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
+ */
+public class UnSafeTupleBytesComparator {
+ private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+ static final boolean littleEndian =
+ ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+ public static int compare(long ptr1, long ptr2) {
+ int lstrLen = UNSAFE.getInt(ptr1);
+ int rstrLen = UNSAFE.getInt(ptr2);
+
+ ptr1 += SizeOf.SIZE_OF_INT;
+ ptr2 += SizeOf.SIZE_OF_INT;
+
+ int minLength = Math.min(lstrLen, rstrLen);
+ int minWords = minLength / Longs.BYTES;
+
+ /*
+ * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+ * time is no slower than comparing 4 bytes at a time even on 32-bit.
+ * On the other hand, it is substantially faster on 64-bit.
+ */
+ for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+ long lw = UNSAFE.getLong(ptr1);
+ long rw = UNSAFE.getLong(ptr2);
+ long diff = lw ^ rw;
+
+ if (diff != 0) {
+ if (!littleEndian) {
+ return UnsignedLongs.compare(lw, rw);
+ }
+
+ // Use binary search
+ int n = 0;
+ int y;
+ int x = (int) diff;
+ if (x == 0) {
+ x = (int) (diff >>> 32);
+ n = 32;
+ }
+
+ y = x << 16;
+ if (y == 0) {
+ n += 16;
+ } else {
+ x = y;
+ }
+
+ y = x << 8;
+ if (y == 0) {
+ n += 8;
+ }
+ return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+ }
+
+ ptr1 += SizeOf.SIZE_OF_LONG;
+ ptr2 += SizeOf.SIZE_OF_LONG;
+ }
+
+ // The epilogue to cover the last (minLength % 8) elements.
+ for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+ int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
+ if (result != 0) {
+ return result;
+ }
+ }
+ return lstrLen - rstrLen;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
new file mode 100644
index 0000000..51dbb29
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class ZeroCopyTuple extends UnSafeTuple {
+
+ public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+ super.set(bb, relativePos, length, types);
+ }
+
+ @Override
+ public void release() {
+ // nothing to do
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
new file mode 100644
index 0000000..f5c8a08
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.index";
+option java_outer_classname = "IndexProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message TupleComparatorProto {
+ required SchemaProto schema = 1;
+ repeated SortSpecProto sortSpecs = 2;
+ repeated TupleComparatorSpecProto compSpecs = 3;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
new file mode 100644
index 0000000..47d11c7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -0,0 +1,184 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<configuration>
+ <!-- Storage Manager Configuration -->
+ <property>
+ <name>tajo.storage.manager.hdfs.class</name>
+ <value>org.apache.tajo.storage.FileStorageManager</value>
+ </property>
+ <property>
+ <name>tajo.storage.manager.hbase.class</name>
+ <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.manager.concurrency.perDisk</name>
+ <value>1</value>
+ <description></description>
+ </property>
+
+ <!--- Registered Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler</name>
+ <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
+ </property>
+
+ <!--- Fragment Class Configurations -->
+ <property>
+ <name>tajo.storage.fragment.textfile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.csv.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.raw.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.rcfile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.row.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.parquet.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.sequencefile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.avro.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.hbase.class</name>
+ <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
+ </property>
+
+ <!--- Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler.textfile.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.sequencefile.class</name>
+ <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.avro.class</name>
+ <value>org.apache.tajo.storage.avro.AvroScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.hbase.class</name>
+ <value>org.apache.tajo.storage.hbase.HBaseScanner</value>
+ </property>
+
+ <!--- Appender Handler -->
+ <property>
+ <name>tajo.storage.appender-handler</name>
+ <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.textfile.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.sequencefile.class</name>
+ <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.avro.class</name>
+ <value>org.apache.tajo.storage.avro.AvroAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.hbase.class</name>
+ <value>org.apache.tajo.storage.hbase.HFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.hfile.class</name>
+ <value>org.apache.tajo.storage.hbase.HFileAppender</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
new file mode 100644
index 0000000..0251dc7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestFrameTuple {
+ private Tuple tuple1;
+ private Tuple tuple2;
+
+ @Before
+ public void setUp() throws Exception {
+ tuple1 = new VTuple(11);
+ tuple1.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar('9'),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("hyunsik"),
+ DatumFactory.createBlob("hyunsik".getBytes()),
+ DatumFactory.createInet4("192.168.0.1")
+ });
+
+ tuple2 = new VTuple(11);
+ tuple2.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar('9'),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("hyunsik"),
+ DatumFactory.createBlob("hyunsik".getBytes()),
+ DatumFactory.createInet4("192.168.0.1")
+ });
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testFrameTuple() {
+ Tuple frame = new FrameTuple(tuple1, tuple2);
+ assertEquals(22, frame.size());
+ for (int i = 0; i < 22; i++) {
+ assertTrue(frame.contains(i));
+ }
+
+ assertEquals(DatumFactory.createInt8(23l), frame.get(5));
+ assertEquals(DatumFactory.createInt8(23l), frame.get(16));
+ assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
+ assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
new file mode 100644
index 0000000..c6149f7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.BytesUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestLazyTuple {
+
+ Schema schema;
+ byte[][] textRow;
+ byte[] nullbytes;
+ SerializerDeserializer serde;
+
+ @Before
+ public void setUp() {
+ nullbytes = "\\N".getBytes();
+
+ schema = new Schema();
+ schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
+ schema.addColumn("col2", TajoDataTypes.Type.BIT);
+ schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7);
+ schema.addColumn("col4", TajoDataTypes.Type.INT2);
+ schema.addColumn("col5", TajoDataTypes.Type.INT4);
+ schema.addColumn("col6", TajoDataTypes.Type.INT8);
+ schema.addColumn("col7", TajoDataTypes.Type.FLOAT4);
+ schema.addColumn("col8", TajoDataTypes.Type.FLOAT8);
+ schema.addColumn("col9", TajoDataTypes.Type.TEXT);
+ schema.addColumn("col10", TajoDataTypes.Type.BLOB);
+ schema.addColumn("col11", TajoDataTypes.Type.INET4);
+ schema.addColumn("col12", TajoDataTypes.Type.INT4);
+ schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(DatumFactory.createBool(true)).append('|');
+ sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|');
+ sb.append(DatumFactory.createChar("str")).append('|');
+ sb.append(DatumFactory.createInt2((short) 17)).append('|');
+ sb.append(DatumFactory.createInt4(59)).append('|');
+ sb.append(DatumFactory.createInt8(23l)).append('|');
+ sb.append(DatumFactory.createFloat4(77.9f)).append('|');
+ sb.append(DatumFactory.createFloat8(271.9f)).append('|');
+ sb.append(DatumFactory.createText("str2")).append('|');
+ sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
+ sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
+ sb.append(new String(nullbytes)).append('|');
+ sb.append(NullDatum.get());
+ textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|');
+ serde = new TextSerializerDeserializer();
+ }
+
+ @Test
+ public void testGetDatum() {
+
+ LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
+ assertEquals(DatumFactory.createBool(true), t1.get(0));
+ assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
+ assertEquals(DatumFactory.createChar("str"), t1.get(2));
+ assertEquals(DatumFactory.createInt2((short) 17), t1.get(3));
+ assertEquals(DatumFactory.createInt4(59), t1.get(4));
+ assertEquals(DatumFactory.createInt8(23l), t1.get(5));
+ assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6));
+ assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7));
+ assertEquals(DatumFactory.createText("str2"), t1.get(8));
+ assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9));
+ assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10));
+ assertEquals(NullDatum.get(), t1.get(11));
+ assertEquals(NullDatum.get(), t1.get(12));
+ }
+
+ @Test
+ public void testContain() {
+ int colNum = schema.size();
+
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(3, DatumFactory.createInt4(1));
+ t1.put(7, DatumFactory.createInt4(1));
+
+ assertTrue(t1.contains(0));
+ assertFalse(t1.contains(1));
+ assertFalse(t1.contains(2));
+ assertTrue(t1.contains(3));
+ assertFalse(t1.contains(4));
+ assertFalse(t1.contains(5));
+ assertFalse(t1.contains(6));
+ assertTrue(t1.contains(7));
+ assertFalse(t1.contains(8));
+ assertFalse(t1.contains(9));
+ assertFalse(t1.contains(10));
+ assertFalse(t1.contains(11));
+ assertFalse(t1.contains(12));
+ }
+
+ @Test
+ public void testPut() {
+ int colNum = schema.size();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ t1.put(0, DatumFactory.createText("str"));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(11, DatumFactory.createFloat4(0.76f));
+
+ assertTrue(t1.contains(0));
+ assertTrue(t1.contains(1));
+
+ assertEquals(t1.getText(0), "str");
+ assertEquals(t1.get(1).asInt4(), 2);
+ assertTrue(t1.get(11).asFloat4() == 0.76f);
+ }
+
+ @Test
+ public void testEquals() {
+ int colNum = schema.size();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+
+ assertEquals(t1, t2);
+
+ Tuple t3 = new VTuple(colNum);
+ t3.put(0, DatumFactory.createInt4(1));
+ t3.put(1, DatumFactory.createInt4(2));
+ t3.put(3, DatumFactory.createInt4(2));
+ assertEquals(t1, t3);
+ assertEquals(t2, t3);
+
+ LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1);
+ assertNotSame(t1, t4);
+ }
+
+ @Test
+ public void testHashCode() {
+ int colNum = schema.size();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("str"));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+ t2.put(4, DatumFactory.createText("str"));
+
+ assertEquals(t1.hashCode(), t2.hashCode());
+
+ Tuple t3 = new VTuple(colNum);
+ t3.put(0, DatumFactory.createInt4(1));
+ t3.put(1, DatumFactory.createInt4(2));
+ t3.put(3, DatumFactory.createInt4(2));
+ t3.put(4, DatumFactory.createText("str"));
+ assertEquals(t1.hashCode(), t3.hashCode());
+ assertEquals(t2.hashCode(), t3.hashCode());
+
+ Tuple t4 = new VTuple(5);
+ t4.put(0, DatumFactory.createInt4(1));
+ t4.put(1, DatumFactory.createInt4(2));
+ t4.put(4, DatumFactory.createInt4(2));
+
+ assertNotSame(t1.hashCode(), t4.hashCode());
+ }
+
+ @Test
+ public void testPutTuple() {
+ int colNum = schema.size();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(2, DatumFactory.createInt4(3));
+
+
+ Schema schema2 = new Schema();
+ schema2.addColumn("col1", TajoDataTypes.Type.INT8);
+ schema2.addColumn("col2", TajoDataTypes.Type.INT8);
+
+ LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1);
+ t2.put(0, DatumFactory.createInt4(4));
+ t2.put(1, DatumFactory.createInt4(5));
+
+ t1.put(3, t2);
+
+ for (int i = 0; i < 5; i++) {
+ assertEquals(i + 1, t1.get(i).asInt4());
+ }
+ }
+
+ @Test
+ public void testInvalidNumber() {
+ byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|');
+ Schema schema = new Schema();
+ schema.addColumn("col1", TajoDataTypes.Type.INT2);
+ schema.addColumn("col2", TajoDataTypes.Type.INT4);
+ schema.addColumn("col3", TajoDataTypes.Type.INT8);
+ schema.addColumn("col4", TajoDataTypes.Type.FLOAT4);
+ schema.addColumn("col5", TajoDataTypes.Type.FLOAT8);
+
+ LazyTuple tuple = new LazyTuple(schema, bytes, 0);
+ assertEquals(bytes.length, tuple.size());
+
+ for (int i = 0; i < tuple.size(); i++){
+ assertEquals(NullDatum.get(), tuple.get(i));
+ }
+ }
+
+ @Test
+ public void testClone() throws CloneNotSupportedException {
+ int colNum = schema.size();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("str"));
+
+ LazyTuple t2 = (LazyTuple) t1.clone();
+ assertNotSame(t1, t2);
+ assertEquals(t1, t2);
+
+ assertSame(t1.get(4), t2.get(4));
+
+ t1.clear();
+ assertFalse(t1.equals(t2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
new file mode 100644
index 0000000..639ca04
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTupleComparator {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testCompare() {
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.INT4);
+ schema.addColumn("col2", Type.INT4);
+ schema.addColumn("col3", Type.INT4);
+ schema.addColumn("col4", Type.INT4);
+ schema.addColumn("col5", Type.TEXT);
+
+ Tuple tuple1 = new VTuple(5);
+ Tuple tuple2 = new VTuple(5);
+
+ tuple1.put(
+ new Datum[] {
+ DatumFactory.createInt4(9),
+ DatumFactory.createInt4(3),
+ DatumFactory.createInt4(33),
+ DatumFactory.createInt4(4),
+ DatumFactory.createText("abc")});
+ tuple2.put(
+ new Datum[] {
+ DatumFactory.createInt4(1),
+ DatumFactory.createInt4(25),
+ DatumFactory.createInt4(109),
+ DatumFactory.createInt4(4),
+ DatumFactory.createText("abd")});
+
+ SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false);
+ SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false);
+
+ BaseTupleComparator tc = new BaseTupleComparator(schema,
+ new SortSpec[] {sortKey1, sortKey2});
+ assertEquals(-1, tc.compare(tuple1, tuple2));
+ assertEquals(1, tc.compare(tuple2, tuple1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
new file mode 100644
index 0000000..1bbd9ec
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestVTuple {
+
+ /**
+ * @throws Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+
+ }
+
+ @Test
+ public void testContain() {
+ VTuple t1 = new VTuple(260);
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(1));
+ t1.put(27, DatumFactory.createInt4(1));
+ t1.put(96, DatumFactory.createInt4(1));
+ t1.put(257, DatumFactory.createInt4(1));
+
+ assertTrue(t1.contains(0));
+ assertTrue(t1.contains(1));
+ assertFalse(t1.contains(2));
+ assertFalse(t1.contains(3));
+ assertFalse(t1.contains(4));
+ assertTrue(t1.contains(27));
+ assertFalse(t1.contains(28));
+ assertFalse(t1.contains(95));
+ assertTrue(t1.contains(96));
+ assertFalse(t1.contains(97));
+ assertTrue(t1.contains(257));
+ }
+
+ @Test
+ public void testPut() {
+ VTuple t1 = new VTuple(260);
+ t1.put(0, DatumFactory.createText("str"));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(257, DatumFactory.createFloat4(0.76f));
+
+ assertTrue(t1.contains(0));
+ assertTrue(t1.contains(1));
+
+ assertEquals(t1.getText(0),"str");
+ assertEquals(t1.get(1).asInt4(),2);
+ assertTrue(t1.get(257).asFloat4() == 0.76f);
+ }
+
+ @Test
+ public void testEquals() {
+ Tuple t1 = new VTuple(5);
+ Tuple t2 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+
+ assertEquals(t1,t2);
+
+ Tuple t3 = new VTuple(5);
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(4, DatumFactory.createInt4(2));
+
+ assertNotSame(t1,t3);
+ }
+
+ @Test
+ public void testHashCode() {
+ Tuple t1 = new VTuple(5);
+ Tuple t2 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("hyunsik"));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+ t2.put(4, DatumFactory.createText("hyunsik"));
+
+ assertEquals(t1.hashCode(),t2.hashCode());
+
+ Tuple t3 = new VTuple(5);
+ t3.put(0, DatumFactory.createInt4(1));
+ t3.put(1, DatumFactory.createInt4(2));
+ t3.put(4, DatumFactory.createInt4(2));
+
+ assertNotSame(t1.hashCode(),t3.hashCode());
+ }
+
+ @Test
+ public void testPutTuple() {
+ Tuple t1 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(2, DatumFactory.createInt4(3));
+
+ Tuple t2 = new VTuple(2);
+ t2.put(0, DatumFactory.createInt4(4));
+ t2.put(1, DatumFactory.createInt4(5));
+
+ t1.put(3, t2);
+
+ for (int i = 0; i < 5; i++) {
+ assertEquals(i+1, t1.get(i).asInt4());
+ }
+ }
+
+ @Test
+ public void testClone() throws CloneNotSupportedException {
+ Tuple t1 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("str"));
+
+ VTuple t2 = (VTuple) t1.clone();
+ assertNotSame(t1, t2);
+ assertEquals(t1, t2);
+
+ assertSame(t1.get(4), t2.get(4));
+
+ t1.clear();
+ assertFalse(t1.equals(t2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
new file mode 100644
index 0000000..d1c561b
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<configuration>
+ <property>
+ <name>fs.s3.impl</name>
+ <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value>
+ </property>
+
+ <!-- Storage Manager Configuration -->
+ <property>
+ <name>tajo.storage.manager.hdfs.class</name>
+ <value>org.apache.tajo.storage.FileStorageManager</value>
+ </property>
+ <property>
+ <name>tajo.storage.manager.hbase.class</name>
+ <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+ </property>
+
+ <!--- Registered Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler</name>
+ <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+ </property>
+
+ <!--- Fragment Class Configurations -->
+ <property>
+ <name>tajo.storage.fragment.csv.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.raw.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.rcfile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.row.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.trevni.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.parquet.class</name>
+ <value>org.apache.tajo.storage.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.sequencefile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.avro.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+
+ <!--- Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.trevni.class</name>
+ <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.sequencefile.class</name>
+ <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.avro.class</name>
+ <value>org.apache.tajo.storage.avro.AvroScanner</value>
+ </property>
+
+ <!--- Appender Handler -->
+ <property>
+ <name>tajo.storage.appender-handler</name>
+ <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.trevni.class</name>
+ <value>org.apache.tajo.storage.trevni.TrevniAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.sequencefile.class</name>
+ <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.avro.class</name>
+ <value>org.apache.tajo.storage.avro.AvroAppender</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/pom.xml b/tajo-storage/tajo-storage-hbase/pom.xml
new file mode 100644
index 0000000..e37149d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/pom.xml
@@ -0,0 +1,349 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright 2012 Database Lab., Korea Univ.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>tajo-project</artifactId>
+ <groupId>org.apache.tajo</groupId>
+ <version>0.9.1-SNAPSHOT</version>
+ <relativePath>../../tajo-project</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>tajo-storage-hbase</artifactId>
+ <packaging>jar</packaging>
+ <name>Tajo HBase Storage</name>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>repository.jboss.org</id>
+ <url>https://repository.jboss.org/nexus/content/repositories/releases/
+ </url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <tajo.test>TRUE</tajo.test>
+ </systemProperties>
+ <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-protobuf-generated-sources-directory</id>
+ <phase>initialize</phase>
+ <configuration>
+ <target>
+ <mkdir dir="target/generated-sources/proto" />
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2</version>
+ <executions>
+ <execution>
+ <id>generate-sources</id>
+ <phase>generate-sources</phase>
+ <configuration>
+ <executable>protoc</executable>
+ <arguments>
+ <argument>-Isrc/main/proto/</argument>
+ <argument>--proto_path=../../tajo-common/src/main/proto</argument>
+ <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
+ <argument>--java_out=target/generated-sources/proto</argument>
+ <argument>src/main/proto/StorageFragmentProtos.proto</argument>
+ </arguments>
+ </configuration>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/proto</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-catalog-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-plan</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-api</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-hs</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>docs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <execution>
+ <!-- build javadoc jars per jar for publishing to maven -->
+ <id>module-javadocs</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <destDir>${project.build.directory}</destDir>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.15</version>
+ </plugin>
+ </plugins>
+ </reporting>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
new file mode 100644
index 0000000..8615235
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.TUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstract class for HBase appender.
+ */
+public abstract class AbstractHBaseAppender implements Appender {
+ protected Configuration conf;
+ protected Schema schema;
+ protected TableMeta meta;
+ protected QueryUnitAttemptId taskAttemptId;
+ protected Path stagingDir;
+ protected boolean inited = false;
+
+ protected ColumnMapping columnMapping;
+ protected TableStatistics stats;
+ protected boolean enabledStats;
+
+ protected int columnNum;
+
+ protected byte[][][] mappingColumnFamilies;
+ protected boolean[] isBinaryColumns;
+ protected boolean[] isRowKeyMappings;
+ protected boolean[] isColumnKeys;
+ protected boolean[] isColumnValues;
+ protected int[] rowKeyFieldIndexes;
+ protected int[] rowkeyColumnIndexes;
+ protected char rowKeyDelimiter;
+
+ // the following four variables are used for '<cfname>:key:' or '<cfname>:value:' mapping
+ protected int[] columnKeyValueDataIndexes;
+ protected byte[][] columnKeyDatas;
+ protected byte[][] columnValueDatas;
+ protected byte[][] columnKeyCfNames;
+
+ protected KeyValue[] keyValues;
+
+ public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+ Schema schema, TableMeta meta, Path stagingDir) {
+ this.conf = conf;
+ this.schema = schema;
+ this.meta = meta;
+ this.stagingDir = stagingDir;
+ this.taskAttemptId = taskAttemptId;
+ }
+
+ @Override
+ public void init() throws IOException {
+ if (inited) {
+ throw new IllegalStateException("FileAppender is already initialized.");
+ }
+ inited = true;
+ if (enabledStats) {
+ stats = new TableStatistics(this.schema);
+ }
+ columnMapping = new ColumnMapping(schema, meta);
+
+ mappingColumnFamilies = columnMapping.getMappingColumns();
+
+ isRowKeyMappings = columnMapping.getIsRowKeyMappings();
+ List<Integer> rowkeyColumnIndexList = new ArrayList<Integer>();
+ for (int i = 0; i < isRowKeyMappings.length; i++) {
+ if (isRowKeyMappings[i]) {
+ rowkeyColumnIndexList.add(i);
+ }
+ }
+ rowkeyColumnIndexes = TUtil.toArray(rowkeyColumnIndexList);
+
+ isBinaryColumns = columnMapping.getIsBinaryColumns();
+ isColumnKeys = columnMapping.getIsColumnKeys();
+ isColumnValues = columnMapping.getIsColumnValues();
+ rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
+ rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
+
+ this.columnNum = schema.size();
+
+ // In the case of '<cfname>:key:' or '<cfname>:value:' KeyValue object should be set with the qualifier and value
+ // which are mapped to the same column family.
+ columnKeyValueDataIndexes = new int[isColumnKeys.length];
+ int index = 0;
+ int numKeyValues = 0;
+ Map<String, Integer> cfNameIndexMap = new HashMap<String, Integer>();
+ for (int i = 0; i < isColumnKeys.length; i++) {
+ if (isRowKeyMappings[i]) {
+ continue;
+ }
+ if (isColumnKeys[i] || isColumnValues[i]) {
+ String cfName = new String(mappingColumnFamilies[i][0]);
+ if (!cfNameIndexMap.containsKey(cfName)) {
+ cfNameIndexMap.put(cfName, index);
+ columnKeyValueDataIndexes[i] = index;
+ index++;
+ numKeyValues++;
+ } else {
+ columnKeyValueDataIndexes[i] = cfNameIndexMap.get(cfName);
+ }
+ } else {
+ numKeyValues++;
+ }
+ }
+ columnKeyCfNames = new byte[cfNameIndexMap.size()][];
+ for (Map.Entry<String, Integer> entry: cfNameIndexMap.entrySet()) {
+ columnKeyCfNames[entry.getValue()] = entry.getKey().getBytes();
+ }
+ columnKeyDatas = new byte[cfNameIndexMap.size()][];
+ columnValueDatas = new byte[cfNameIndexMap.size()][];
+
+ keyValues = new KeyValue[numKeyValues];
+ }
+
+ private ByteArrayOutputStream bout = new ByteArrayOutputStream();
+
+ protected byte[] getRowKeyBytes(Tuple tuple) throws IOException {
+ Datum datum;
+ byte[] rowkey;
+ if (rowkeyColumnIndexes.length > 1) {
+ bout.reset();
+ for (int i = 0; i < rowkeyColumnIndexes.length; i++) {
+ datum = tuple.get(rowkeyColumnIndexes[i]);
+ if (isBinaryColumns[rowkeyColumnIndexes[i]]) {
+ rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+ } else {
+ rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+ }
+ bout.write(rowkey);
+ if (i < rowkeyColumnIndexes.length - 1) {
+ bout.write(rowKeyDelimiter);
+ }
+ }
+ rowkey = bout.toByteArray();
+ } else {
+ int index = rowkeyColumnIndexes[0];
+ datum = tuple.get(index);
+ if (isBinaryColumns[index]) {
+ rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum);
+ } else {
+ rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum);
+ }
+ }
+
+ return rowkey;
+ }
+
+ protected void readKeyValues(Tuple tuple, byte[] rowkey) throws IOException {
+ int keyValIndex = 0;
+ for (int i = 0; i < columnNum; i++) {
+ if (isRowKeyMappings[i]) {
+ continue;
+ }
+ Datum datum = tuple.get(i);
+ byte[] value;
+ if (isBinaryColumns[i]) {
+ value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
+ } else {
+ value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
+ }
+
+ if (isColumnKeys[i]) {
+ columnKeyDatas[columnKeyValueDataIndexes[i]] = value;
+ } else if (isColumnValues[i]) {
+ columnValueDatas[columnKeyValueDataIndexes[i]] = value;
+ } else {
+ keyValues[keyValIndex] = new KeyValue(rowkey, mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value);
+ keyValIndex++;
+ }
+ }
+
+ for (int i = 0; i < columnKeyDatas.length; i++) {
+ keyValues[keyValIndex++] = new KeyValue(rowkey, columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]);
+ }
+ }
+
+ @Override
+ public void enableStats() {
+ enabledStats = true;
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
new file mode 100644
index 0000000..79161cc
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.SortNode;
+import org.apache.tajo.plan.logical.SortNode.SortPurpose;
+import org.apache.tajo.plan.logical.UnaryNode;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
+
+public class AddSortForInsertRewriter implements RewriteRule {
+ private int[] sortColumnIndexes;
+ private Column[] sortColumns;
+ public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) {
+ this.sortColumns = sortColumns;
+ this.sortColumnIndexes = new int[sortColumns.length];
+
+ Schema tableSchema = tableDesc.getSchema();
+ for (int i = 0; i < sortColumns.length; i++) {
+ sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "AddSortForInsertRewriter";
+ }
+
+ @Override
+ public boolean isEligible(LogicalPlan plan) {
+ StoreType storeType = PlannerUtil.getStoreType(plan);
+ return storeType != null;
+ }
+
+ @Override
+ public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ UnaryNode insertNode = rootNode.getChild();
+ LogicalNode childNode = insertNode.getChild();
+
+ Schema sortSchema = childNode.getOutSchema();
+ SortNode sortNode = plan.createNode(SortNode.class);
+ sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
+ sortNode.setInSchema(sortSchema);
+ sortNode.setOutSchema(sortSchema);
+
+ SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
+ int index = 0;
+
+ for (int i = 0; i < sortColumnIndexes.length; i++) {
+ Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
+ if (sortColumn == null) {
+ throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
+ }
+ sortSpecs[index++] = new SortSpec(sortColumn, true, true);
+ }
+ sortNode.setSortSpecs(sortSpecs);
+
+ sortNode.setChild(insertNode.getChild());
+ insertNode.setChild(sortNode);
+ plan.getRootBlock().registerNode(sortNode);
+
+ return plan;
+ }
+}