You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/16 13:06:10 UTC
[3/6] TAJO-907: Implement off-heap tuple block and zero-copy tuple.
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
new file mode 100644
index 0000000..c3f77e7
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.datum.Datum;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto;
+import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
+
+/**
+ * The Comparator class for Tuples
+ *
+ * @see Tuple
+ */
+public class BaseTupleComparator extends TupleComparator implements ProtoObject<TupleComparatorProto> {
+ private final Schema schema;
+ private final SortSpec [] sortSpecs;
+ private final int[] sortKeyIds;
+ private final boolean[] asc;
+ @SuppressWarnings("unused")
+ private final boolean[] nullFirsts;
+
+ private Datum left;
+ private Datum right;
+ private int compVal;
+
+ /**
+ * @param schema The schema of input tuples
+ * @param sortKeys The description of sort keys
+ */
+ public BaseTupleComparator(Schema schema, SortSpec[] sortKeys) {
+ Preconditions.checkArgument(sortKeys.length > 0,
+ "At least one sort key must be specified.");
+
+ this.schema = schema;
+ this.sortSpecs = sortKeys;
+ this.sortKeyIds = new int[sortKeys.length];
+ this.asc = new boolean[sortKeys.length];
+ this.nullFirsts = new boolean[sortKeys.length];
+ for (int i = 0; i < sortKeys.length; i++) {
+ if (sortKeys[i].getSortKey().hasQualifier()) {
+ this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
+ } else {
+ this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName());
+ }
+
+ this.asc[i] = sortKeys[i].isAscending();
+ this.nullFirsts[i]= sortKeys[i].isNullFirst();
+ }
+ }
+
+ public BaseTupleComparator(TupleComparatorProto proto) {
+ this.schema = new Schema(proto.getSchema());
+
+ this.sortSpecs = new SortSpec[proto.getSortSpecsCount()];
+ for (int i = 0; i < proto.getSortSpecsCount(); i++) {
+ sortSpecs[i] = new SortSpec(proto.getSortSpecs(i));
+ }
+
+ this.sortKeyIds = new int[proto.getCompSpecsCount()];
+ this.asc = new boolean[proto.getCompSpecsCount()];
+ this.nullFirsts = new boolean[proto.getCompSpecsCount()];
+
+ for (int i = 0; i < proto.getCompSpecsCount(); i++) {
+ TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i);
+ sortKeyIds[i] = sortSepcProto.getColumnId();
+ asc[i] = sortSepcProto.getAscending();
+ nullFirsts[i] = sortSepcProto.getNullFirst();
+ }
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public SortSpec [] getSortSpecs() {
+ return sortSpecs;
+ }
+
+ public int [] getSortKeyIds() {
+ return sortKeyIds;
+ }
+
+ public boolean isAscendingFirstKey() {
+ return this.asc[0];
+ }
+
+ @Override
+ public int compare(Tuple tuple1, Tuple tuple2) {
+ for (int i = 0; i < sortKeyIds.length; i++) {
+ left = tuple1.get(sortKeyIds[i]);
+ right = tuple2.get(sortKeyIds[i]);
+
+ if (left.isNull() || right.isNull()) {
+ if (!left.equals(right)) {
+ if (left.isNull()) {
+ compVal = 1;
+ } else if (right.isNull()) {
+ compVal = -1;
+ }
+ if (nullFirsts[i]) {
+ if (compVal != 0) {
+ compVal *= -1;
+ }
+ }
+ } else {
+ compVal = 0;
+ }
+ } else {
+ if (asc[i]) {
+ compVal = left.compareTo(right);
+ } else {
+ compVal = right.compareTo(left);
+ }
+ }
+
+ if (compVal < 0 || compVal > 0) {
+ return compVal;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(sortKeyIds);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof BaseTupleComparator) {
+ BaseTupleComparator other = (BaseTupleComparator) obj;
+ if (sortKeyIds.length != other.sortKeyIds.length) {
+ return false;
+ }
+
+ for (int i = 0; i < sortKeyIds.length; i++) {
+ if (sortKeyIds[i] != other.sortKeyIds[i] ||
+ asc[i] != other.asc[i] ||
+ nullFirsts[i] != other.nullFirsts[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public TupleComparatorProto getProto() {
+ TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder();
+ builder.setSchema(schema.getProto());
+ for (int i = 0; i < sortSpecs.length; i++) {
+ builder.addSortSpecs(sortSpecs[i].getProto());
+ }
+
+ TupleComparatorSpecProto.Builder sortSpecBuilder;
+ for (int i = 0; i < sortKeyIds.length; i++) {
+ sortSpecBuilder = TupleComparatorSpecProto.newBuilder();
+ sortSpecBuilder.setColumnId(sortKeyIds[i]);
+ sortSpecBuilder.setAscending(asc[i]);
+ sortSpecBuilder.setNullFirst(nullFirsts[i]);
+ builder.addCompSpecs(sortSpecBuilder);
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ String prefix = "";
+ for (int i = 0; i < sortKeyIds.length; i++) {
+ sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i])
+ .append(",Asc=").append(asc[i])
+ .append(",NullFirst=").append(nullFirsts[i]);
+ prefix = " ,";
+ }
+ return sb.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
index 42b49a8..609a3df 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
@@ -36,7 +36,7 @@ public class BinarySerializerDeserializer implements SerializerDeserializer {
throws IOException {
byte[] bytes;
int length = 0;
- if (datum == null || datum instanceof NullDatum) {
+ if (datum == null || datum.isNull()) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
index e0f8a2e..8b7e2e0 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -23,7 +23,7 @@ package org.apache.tajo.storage;
import com.google.common.base.Preconditions;
import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.IntervalDatum;
import org.apache.tajo.datum.ProtobufDatum;
import org.apache.tajo.exception.UnsupportedException;
@@ -71,7 +71,12 @@ public class FrameTuple implements Tuple, Cloneable {
@Override
public boolean isNull(int fieldid) {
- return get(fieldid) instanceof NullDatum;
+ return get(fieldid).isNull();
+ }
+
+ @Override
+ public boolean isNotNull(int fieldid) {
+ return !isNull(fieldid);
}
@Override
@@ -177,6 +182,11 @@ public class FrameTuple implements Tuple, Cloneable {
}
@Override
+ public IntervalDatum getInterval(int fieldId) {
+ return (IntervalDatum) get(fieldId);
+ }
+
+ @Override
public char [] getUnicodeChars(int fieldId) {
return get(fieldId).asUnicodeChars();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index 167e4a8..bfbe478 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -20,6 +20,7 @@ package org.apache.tajo.storage;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.IntervalDatum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatum;
import org.apache.tajo.exception.UnsupportedException;
@@ -68,7 +69,12 @@ public class LazyTuple implements Tuple, Cloneable {
@Override
public boolean isNull(int fieldid) {
- return get(fieldid) instanceof NullDatum;
+ return get(fieldid).isNull();
+ }
+
+ @Override
+ public boolean isNotNull(int fieldid) {
+ return !isNull(fieldid);
}
@Override
@@ -199,6 +205,11 @@ public class LazyTuple implements Tuple, Cloneable {
}
@Override
+ public IntervalDatum getInterval(int fieldId) {
+ return (IntervalDatum) get(fieldId);
+ }
+
+ @Override
public char[] getUnicodeChars(int fieldId) {
return get(fieldId).asUnicodeChars();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
deleted file mode 100644
index f19b61f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.util.ClassSize;
-
-public class MemoryUtil {
-
- /** Overhead for an NullDatum */
- public static final long NULL_DATUM;
-
- /** Overhead for an BoolDatum */
- public static final long BOOL_DATUM;
-
- /** Overhead for an CharDatum */
- public static final long CHAR_DATUM;
-
- /** Overhead for an BitDatum */
- public static final long BIT_DATUM;
-
- /** Overhead for an Int2Datum */
- public static final long INT2_DATUM;
-
- /** Overhead for an Int4Datum */
- public static final long INT4_DATUM;
-
- /** Overhead for an Int8Datum */
- public static final long INT8_DATUM;
-
- /** Overhead for an Float4Datum */
- public static final long FLOAT4_DATUM;
-
- /** Overhead for an Float8Datum */
- public static final long FLOAT8_DATUM;
-
- /** Overhead for an TextDatum */
- public static final long TEXT_DATUM;
-
- /** Overhead for an BlobDatum */
- public static final long BLOB_DATUM;
-
- /** Overhead for an DateDatum */
- public static final long DATE_DATUM;
-
- /** Overhead for an TimeDatum */
- public static final long TIME_DATUM;
-
- /** Overhead for an TimestampDatum */
- public static final long TIMESTAMP_DATUM;
-
- static {
- NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false);
-
- CHAR_DATUM = ClassSize.estimateBase(CharDatum.class, false);
-
- BOOL_DATUM = ClassSize.estimateBase(BooleanDatum.class, false);
-
- BIT_DATUM = ClassSize.estimateBase(BitDatum.class, false);
-
- INT2_DATUM = ClassSize.estimateBase(Int2Datum.class, false);
-
- INT4_DATUM = ClassSize.estimateBase(Int4Datum.class, false);
-
- INT8_DATUM = ClassSize.estimateBase(Int8Datum.class, false);
-
- FLOAT4_DATUM = ClassSize.estimateBase(Float4Datum.class, false);
-
- FLOAT8_DATUM = ClassSize.estimateBase(Float8Datum.class, false);
-
- TEXT_DATUM = ClassSize.estimateBase(TextDatum.class, false);
-
- BLOB_DATUM = ClassSize.estimateBase(BlobDatum.class, false);
-
- DATE_DATUM = ClassSize.estimateBase(DateDatum.class, false);
-
- TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false);
-
- TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false);
- }
-
- public static long calculateMemorySize(Tuple tuple) {
- long total = ClassSize.OBJECT;
- for (Datum datum : tuple.getValues()) {
- switch (datum.type()) {
-
- case NULL_TYPE:
- total += NULL_DATUM;
- break;
-
- case BOOLEAN:
- total += BOOL_DATUM;
- break;
-
- case BIT:
- total += BIT_DATUM;
- break;
-
- case CHAR:
- total += CHAR_DATUM + datum.size();
- break;
-
- case INT1:
- case INT2:
- total += INT2_DATUM;
- break;
-
- case INT4:
- total += INT4_DATUM;
- break;
-
- case INT8:
- total += INT8_DATUM;
- break;
-
- case FLOAT4:
- total += FLOAT4_DATUM;
- break;
-
- case FLOAT8:
- total += FLOAT4_DATUM;
- break;
-
- case TEXT:
- total += TEXT_DATUM + datum.size();
- break;
-
- case DATE:
- total += DATE_DATUM;
- break;
-
- case TIME:
- total += TIME_DATUM;
- break;
-
- case TIMESTAMP:
- total += TIMESTAMP_DATUM;
- break;
-
- default:
- break;
- }
- }
-
- return total;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 1f57675..7f729e1 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -31,6 +31,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.util.UnsafeUtil;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.BitArray;
@@ -42,6 +43,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class RawFile {
+ public static final String FILE_EXTENSION = "raw";
+
private static final Log LOG = LogFactory.getLog(RawFile.class);
public static class RawFileScanner extends FileScanner implements SeekableScanner {
@@ -380,7 +383,7 @@ public class RawFile {
tableStats.setNumRows(recordCount);
}
- StorageUtil.closeBuffer(buffer);
+ UnsafeUtil.free(buffer);
IOUtils.cleanup(LOG, channel, fis);
}
@@ -722,7 +725,7 @@ public class RawFile {
LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
}
- StorageUtil.closeBuffer(buffer);
+ UnsafeUtil.free(buffer);
IOUtils.cleanup(LOG, channel, randomAccessFile);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 70044ca..24b6280 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -23,8 +23,10 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.storage.exception.UnknownDataTypeException;
+import org.apache.tajo.tuple.offheap.RowWriter;
import org.apache.tajo.util.BitArray;
import java.nio.ByteBuffer;
@@ -177,7 +179,8 @@ public class RowStoreUtil {
nullFlags = new BitArray(schema.size());
headerSize = nullFlags.bytesLength();
}
- public byte [] toBytes(Tuple tuple) {
+
+ public byte[] toBytes(Tuple tuple) {
nullFlags.clear();
int size = estimateTupleDataSize(tuple);
ByteBuffer bb = ByteBuffer.allocate(size + headerSize);
@@ -191,42 +194,64 @@ public class RowStoreUtil {
col = schema.getColumn(i);
switch (col.getDataType().getType()) {
- case NULL_TYPE: nullFlags.set(i); break;
- case BOOLEAN: bb.put(tuple.get(i).asByte()); break;
- case BIT: bb.put(tuple.get(i).asByte()); break;
- case CHAR: bb.put(tuple.get(i).asByte()); break;
- case INT2: bb.putShort(tuple.get(i).asInt2()); break;
- case INT4: bb.putInt(tuple.get(i).asInt4()); break;
- case INT8: bb.putLong(tuple.get(i).asInt8()); break;
- case FLOAT4: bb.putFloat(tuple.get(i).asFloat4()); break;
- case FLOAT8: bb.putDouble(tuple.get(i).asFloat8()); break;
- case TEXT:
- byte [] _string = tuple.get(i).asByteArray();
- bb.putInt(_string.length);
- bb.put(_string);
- break;
- case DATE: bb.putInt(tuple.get(i).asInt4()); break;
- case TIME:
- case TIMESTAMP:
- bb.putLong(tuple.get(i).asInt8());
- break;
- case INTERVAL:
- IntervalDatum interval = (IntervalDatum) tuple.get(i);
- bb.putInt(interval.getMonths());
- bb.putLong(interval.getMilliSeconds());
- break;
- case BLOB:
- byte [] bytes = tuple.get(i).asByteArray();
- bb.putInt(bytes.length);
- bb.put(bytes);
- break;
- case INET4:
- byte [] ipBytes = tuple.get(i).asByteArray();
- bb.put(ipBytes);
- break;
- case INET6: bb.put(tuple.get(i).asByteArray()); break;
- default:
- throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
+ case NULL_TYPE:
+ nullFlags.set(i);
+ break;
+ case BOOLEAN:
+ bb.put(tuple.get(i).asByte());
+ break;
+ case BIT:
+ bb.put(tuple.get(i).asByte());
+ break;
+ case CHAR:
+ bb.put(tuple.get(i).asByte());
+ break;
+ case INT2:
+ bb.putShort(tuple.get(i).asInt2());
+ break;
+ case INT4:
+ bb.putInt(tuple.get(i).asInt4());
+ break;
+ case INT8:
+ bb.putLong(tuple.get(i).asInt8());
+ break;
+ case FLOAT4:
+ bb.putFloat(tuple.get(i).asFloat4());
+ break;
+ case FLOAT8:
+ bb.putDouble(tuple.get(i).asFloat8());
+ break;
+ case TEXT:
+ byte[] _string = tuple.get(i).asByteArray();
+ bb.putInt(_string.length);
+ bb.put(_string);
+ break;
+ case DATE:
+ bb.putInt(tuple.get(i).asInt4());
+ break;
+ case TIME:
+ case TIMESTAMP:
+ bb.putLong(tuple.get(i).asInt8());
+ break;
+ case INTERVAL:
+ IntervalDatum interval = (IntervalDatum) tuple.get(i);
+ bb.putInt(interval.getMonths());
+ bb.putLong(interval.getMilliSeconds());
+ break;
+ case BLOB:
+ byte[] bytes = tuple.get(i).asByteArray();
+ bb.putInt(bytes.length);
+ bb.put(bytes);
+ break;
+ case INET4:
+ byte[] ipBytes = tuple.get(i).asByteArray();
+ bb.put(ipBytes);
+ break;
+ case INET6:
+ bb.put(tuple.get(i).asByteArray());
+ break;
+ default:
+ throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
}
}
@@ -237,7 +262,7 @@ public class RowStoreUtil {
bb.position(finalPosition);
bb.flip();
- byte [] buf = new byte [bb.limit()];
+ byte[] buf = new byte[bb.limit()];
bb.get(buf);
return buf;
}
@@ -254,24 +279,38 @@ public class RowStoreUtil {
col = schema.getColumn(i);
switch (col.getDataType().getType()) {
- case BOOLEAN:
- case BIT:
- case CHAR: size += 1; break;
- case INT2: size += 2; break;
- case DATE:
- case INT4:
- case FLOAT4: size += 4; break;
- case TIME:
- case TIMESTAMP:
- case INT8:
- case FLOAT8: size += 8; break;
- case INTERVAL: size += 12; break;
- case TEXT:
- case BLOB: size += (4 + tuple.get(i).asByteArray().length); break;
- case INET4:
- case INET6: size += tuple.get(i).asByteArray().length; break;
- default:
- throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
+ case BOOLEAN:
+ case BIT:
+ case CHAR:
+ size += 1;
+ break;
+ case INT2:
+ size += 2;
+ break;
+ case DATE:
+ case INT4:
+ case FLOAT4:
+ size += 4;
+ break;
+ case TIME:
+ case TIMESTAMP:
+ case INT8:
+ case FLOAT8:
+ size += 8;
+ break;
+ case INTERVAL:
+ size += 12;
+ break;
+ case TEXT:
+ case BLOB:
+ size += (4 + tuple.get(i).asByteArray().length);
+ break;
+ case INET4:
+ case INET6:
+ size += tuple.get(i).asByteArray().length;
+ break;
+ default:
+ throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
}
}
@@ -284,4 +323,55 @@ public class RowStoreUtil {
return schema;
}
}
+
+ public static void convert(Tuple tuple, RowWriter writer) {
+ writer.startRow();
+
+ for (int i = 0; i < writer.dataTypes().length; i++) {
+ if (tuple.isNull(i)) {
+ writer.skipField();
+ continue;
+ }
+ switch (writer.dataTypes()[i].getType()) {
+ case BOOLEAN:
+ writer.putBool(tuple.getBool(i));
+ break;
+ case INT1:
+ case INT2:
+ writer.putInt2(tuple.getInt2(i));
+ break;
+ case INT4:
+ case DATE:
+ case INET4:
+ writer.putInt4(tuple.getInt4(i));
+ break;
+ case INT8:
+ case TIMESTAMP:
+ case TIME:
+ writer.putInt8(tuple.getInt8(i));
+ break;
+ case FLOAT4:
+ writer.putFloat4(tuple.getFloat4(i));
+ break;
+ case FLOAT8:
+ writer.putFloat8(tuple.getFloat8(i));
+ break;
+ case TEXT:
+ writer.putText(tuple.getBytes(i));
+ break;
+ case INTERVAL:
+ writer.putInterval((IntervalDatum) tuple.getInterval(i));
+ break;
+ case PROTOBUF:
+ writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i));
+ break;
+ case NULL_TYPE:
+ writer.skipField();
+ break;
+ default:
+ throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]);
+ }
+ }
+ writer.endRow();
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 07fa16b..f35c9ee 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -30,13 +30,11 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.KeyValueSet;
import parquet.hadoop.ParquetOutputFormat;
-import sun.nio.ch.DirectBuffer;
import java.io.DataInput;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -193,16 +191,6 @@ public class StorageUtil extends StorageConstants {
}
}
- public static void closeBuffer(ByteBuffer buffer) {
- if (buffer != null) {
- if (buffer.isDirect()) {
- ((DirectBuffer) buffer).cleaner().clean();
- } else {
- buffer.clear();
- }
- }
- }
-
public static int readFully(InputStream is, byte[] buffer, int offset, int length)
throws IOException {
int nread = 0;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
index a2c08de..64e62ba 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
@@ -26,7 +26,6 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
/**
* This class is not thread-safe.
@@ -69,6 +68,11 @@ public class TableStatistics {
numRows++;
}
+ public void incrementRows(long num) {
+ numRows += num;
+ }
+
+
public long getNumRows() {
return this.numRows;
}
@@ -82,7 +86,7 @@ public class TableStatistics {
}
public void analyzeField(int idx, Datum datum) {
- if (datum instanceof NullDatum) {
+ if (datum.isNull()) {
numNulls[idx]++;
return;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
index d2ccdc7..b42c1b5 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -45,7 +45,7 @@ public class TextSerializerDeserializer implements SerializerDeserializer {
int length = 0;
TajoDataTypes.DataType dataType = col.getDataType();
- if (datum == null || datum instanceof NullDatum) {
+ if (datum == null || datum.isNull()) {
switch (dataType.getType()) {
case CHAR:
case TEXT:
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
index c183171..53e68c7 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -19,7 +19,6 @@
package org.apache.tajo.storage;
import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.ProtobufDatum;
public interface Tuple extends Cloneable {
@@ -28,6 +27,9 @@ public interface Tuple extends Cloneable {
public boolean contains(int fieldid);
public boolean isNull(int fieldid);
+
+ @SuppressWarnings("unused")
+ public boolean isNotNull(int fieldid);
public void clear();
@@ -65,7 +67,9 @@ public interface Tuple extends Cloneable {
public String getText(int fieldId);
- public ProtobufDatum getProtobufDatum(int fieldId);
+ public Datum getProtobufDatum(int fieldId);
+
+ public Datum getInterval(int fieldId);
public char [] getUnicodeChars(int fieldId);
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
index 51388a4..720226b 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
@@ -1,4 +1,4 @@
-/**
+/***
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,164 +18,8 @@
package org.apache.tajo.storage;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-
import java.util.Comparator;
-import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto;
-import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
-
-/**
- * The Comparator class for Tuples
- *
- * @see Tuple
- */
-public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> {
- private final int[] sortKeyIds;
- private final boolean[] asc;
- @SuppressWarnings("unused")
- private final boolean[] nullFirsts;
-
- private Datum left;
- private Datum right;
- private int compVal;
-
- /**
- * @param schema The schema of input tuples
- * @param sortKeys The description of sort keys
- */
- public TupleComparator(Schema schema, SortSpec[] sortKeys) {
- Preconditions.checkArgument(sortKeys.length > 0,
- "At least one sort key must be specified.");
-
- this.sortKeyIds = new int[sortKeys.length];
- this.asc = new boolean[sortKeys.length];
- this.nullFirsts = new boolean[sortKeys.length];
- for (int i = 0; i < sortKeys.length; i++) {
- if (sortKeys[i].getSortKey().hasQualifier()) {
- this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
- } else {
- this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName());
- }
-
- this.asc[i] = sortKeys[i].isAscending();
- this.nullFirsts[i]= sortKeys[i].isNullFirst();
- }
- }
-
- public TupleComparator(TupleComparatorProto proto) {
- this.sortKeyIds = new int[proto.getCompSpecsCount()];
- this.asc = new boolean[proto.getCompSpecsCount()];
- this.nullFirsts = new boolean[proto.getCompSpecsCount()];
-
- for (int i = 0; i < proto.getCompSpecsCount(); i++) {
- TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i);
- sortKeyIds[i] = sortSepcProto.getColumnId();
- asc[i] = sortSepcProto.getAscending();
- nullFirsts[i] = sortSepcProto.getNullFirst();
- }
- }
-
- public boolean isAscendingFirstKey() {
- return this.asc[0];
- }
-
- @Override
- public int compare(Tuple tuple1, Tuple tuple2) {
- for (int i = 0; i < sortKeyIds.length; i++) {
- left = tuple1.get(sortKeyIds[i]);
- right = tuple2.get(sortKeyIds[i]);
-
- if (left instanceof NullDatum || right instanceof NullDatum) {
- if (!left.equals(right)) {
- if (left instanceof NullDatum) {
- compVal = 1;
- } else if (right instanceof NullDatum) {
- compVal = -1;
- }
- if (nullFirsts[i]) {
- if (compVal != 0) {
- compVal *= -1;
- }
- }
- } else {
- compVal = 0;
- }
- } else {
- if (asc[i]) {
- compVal = left.compareTo(right);
- } else {
- compVal = right.compareTo(left);
- }
- }
-
- if (compVal < 0 || compVal > 0) {
- return compVal;
- }
- }
- return 0;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(sortKeyIds);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof TupleComparator) {
- TupleComparator other = (TupleComparator) obj;
- if (sortKeyIds.length != other.sortKeyIds.length) {
- return false;
- }
-
- for (int i = 0; i < sortKeyIds.length; i++) {
- if (sortKeyIds[i] != other.sortKeyIds[i] ||
- asc[i] != other.asc[i] ||
- nullFirsts[i] != other.nullFirsts[i]) {
- return false;
- }
- }
-
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public TupleComparatorProto getProto() {
- TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder();
- TupleComparatorSpecProto.Builder sortSpecBuilder;
-
- for (int i = 0; i < sortKeyIds.length; i++) {
- sortSpecBuilder = TupleComparatorSpecProto.newBuilder();
- sortSpecBuilder.setColumnId(sortKeyIds[i]);
- sortSpecBuilder.setAscending(asc[i]);
- sortSpecBuilder.setNullFirst(nullFirsts[i]);
- builder.addCompSpecs(sortSpecBuilder);
- }
-
- return builder.build();
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
-
- String prefix = "";
- for (int i = 0; i < sortKeyIds.length; i++) {
- sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i])
- .append(",Asc=").append(asc[i])
- .append(",NullFirst=").append(nullFirsts[i]);
- prefix = " ,";
- }
- return sb.toString();
- }
-}
\ No newline at end of file
+public abstract class TupleComparator implements Comparator<Tuple> {
+ public abstract int compare(Tuple o1, Tuple o2);
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
index 6cc09d4..dba02f7 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
@@ -30,10 +30,10 @@ import java.util.Comparator;
public class TupleRange implements Comparable<TupleRange>, Cloneable {
private Tuple start;
private Tuple end;
- private final TupleComparator comp;
+ private final BaseTupleComparator comp;
public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple end) {
- this.comp = new TupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs);
+ this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs);
// if there is only one value, start == end
this.start = start;
this.end = end;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
index 4fb35f9..0e2560c 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -19,10 +19,7 @@
package org.apache.tajo.storage;
import com.google.gson.annotations.Expose;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.Inet4Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.*;
import org.apache.tajo.exception.UnimplementedException;
import java.net.InetAddress;
@@ -38,7 +35,6 @@ public class VTuple implements Tuple, Cloneable {
public VTuple(Tuple tuple) {
this.values = tuple.getValues().clone();
- this.offset = ((VTuple)tuple).offset;
}
public VTuple(Datum [] datum) {
@@ -57,7 +53,12 @@ public class VTuple implements Tuple, Cloneable {
@Override
public boolean isNull(int fieldid) {
- return values[fieldid] instanceof NullDatum;
+ return values[fieldid].isNull();
+ }
+
+ @Override
+ public boolean isNotNull(int fieldid) {
+ return !isNull(fieldid);
}
@Override
@@ -179,6 +180,11 @@ public class VTuple implements Tuple, Cloneable {
}
@Override
+ public IntervalDatum getInterval(int fieldId) {
+ return (IntervalDatum) values[fieldId];
+ }
+
+ @Override
public char[] getUnicodeChars(int fieldId) {
return values[fieldId].asUnicodeChars();
}
@@ -193,23 +199,7 @@ public class VTuple implements Tuple, Cloneable {
}
public String toString() {
- boolean first = true;
- StringBuilder str = new StringBuilder();
- str.append("(");
- for(int i=0; i < values.length; i++) {
- if(values[i] != null) {
- if(first) {
- first = false;
- } else {
- str.append(", ");
- }
- str.append(i)
- .append("=>")
- .append(values[i]);
- }
- }
- str.append(")");
- return str.toString();
+ return toDisplayString(getValues());
}
@Override
@@ -230,4 +220,24 @@ public class VTuple implements Tuple, Cloneable {
}
return false;
}
+
+ public static String toDisplayString(Datum [] values) {
+ boolean first = true;
+ StringBuilder str = new StringBuilder();
+ str.append("(");
+ for(int i=0; i < values.length; i++) {
+ if(values[i] != null) {
+ if(first) {
+ first = false;
+ } else {
+ str.append(", ");
+ }
+ str.append(i)
+ .append("=>")
+ .append(values[i]);
+ }
+ }
+ str.append(")");
+ return str.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index 6af8da0..6aca8d7 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -18,30 +18,27 @@
package org.apache.tajo.storage.avro;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.storage.FileAppender;
import org.apache.tajo.storage.TableStatistics;
import org.apache.tajo.storage.Tuple;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
/**
* FileAppender for writing to Avro files.
@@ -102,7 +99,7 @@ public class AvroAppender extends FileAppender {
}
private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) {
- if (tuple.get(i) instanceof NullDatum) {
+ if (tuple.get(i).isNull()) {
return null;
}
switch (avroType) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
index 74be7ff..7024bdc 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
@@ -20,13 +20,13 @@ package org.apache.tajo.storage.index;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.BaseTupleComparator;
import java.io.IOException;
public interface IndexMethod {
IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
- TupleComparator comparator) throws IOException;
+ BaseTupleComparator comparator) throws IOException;
IndexReader getIndexReader(final Path fileName, Schema keySchema,
- TupleComparator comparator) throws IOException;
+ BaseTupleComparator comparator) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
index 5d43bd5..d24d474 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -27,12 +27,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
+import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.index.IndexMethod;
import org.apache.tajo.storage.index.IndexWriter;
import org.apache.tajo.storage.index.OrderIndexReader;
@@ -67,13 +67,13 @@ public class BSTIndex implements IndexMethod {
@Override
public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
- TupleComparator comparator) throws IOException {
+ BaseTupleComparator comparator) throws IOException {
return new BSTIndexWriter(fileName, level, keySchema, comparator);
}
@Override
public BSTIndexReader getIndexReader(Path fileName, Schema keySchema,
- TupleComparator comparator) throws IOException {
+ BaseTupleComparator comparator) throws IOException {
return new BSTIndexReader(fileName, keySchema, comparator);
}
@@ -89,7 +89,7 @@ public class BSTIndex implements IndexMethod {
private Path fileName;
private final Schema keySchema;
- private final TupleComparator compartor;
+ private final BaseTupleComparator compartor;
private final KeyOffsetCollector collector;
private KeyOffsetCollector rootCollector;
@@ -108,7 +108,7 @@ public class BSTIndex implements IndexMethod {
* @throws IOException
*/
public BSTIndexWriter(final Path fileName, int level, Schema keySchema,
- TupleComparator comparator) throws IOException {
+ BaseTupleComparator comparator) throws IOException {
this.fileName = fileName;
this.level = level;
this.keySchema = keySchema;
@@ -141,7 +141,7 @@ public class BSTIndex implements IndexMethod {
collector.put(key, offset);
}
- public TupleComparator getComparator() {
+ public BaseTupleComparator getComparator() {
return this.compartor;
}
@@ -253,7 +253,7 @@ public class BSTIndex implements IndexMethod {
private class KeyOffsetCollector {
private TreeMap<Tuple, LinkedList<Long>> map;
- public KeyOffsetCollector(TupleComparator comparator) {
+ public KeyOffsetCollector(BaseTupleComparator comparator) {
map = new TreeMap<Tuple, LinkedList<Long>>(comparator);
}
@@ -283,7 +283,7 @@ public class BSTIndex implements IndexMethod {
public class BSTIndexReader implements OrderIndexReader , Closeable{
private Path fileName;
private Schema keySchema;
- private TupleComparator comparator;
+ private BaseTupleComparator comparator;
private FileSystem fs;
private FSDataInputStream indexIn;
@@ -312,7 +312,7 @@ public class BSTIndex implements IndexMethod {
* @param comparator
* @throws IOException
*/
- public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException {
+ public BSTIndexReader(final Path fileName, Schema keySchema, BaseTupleComparator comparator) throws IOException {
this.fileName = fileName;
this.keySchema = keySchema;
this.comparator = comparator;
@@ -327,7 +327,7 @@ public class BSTIndex implements IndexMethod {
return this.keySchema;
}
- public TupleComparator getComparator() {
+ public BaseTupleComparator getComparator() {
return this.comparator;
}
@@ -350,7 +350,7 @@ public class BSTIndex implements IndexMethod {
TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder();
compProto.mergeFrom(compBytes);
- this.comparator = new TupleComparator(compProto.build());
+ this.comparator = new BaseTupleComparator(compProto.build());
// level
this.level = indexIn.readInt();
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
new file mode 100644
index 0000000..e0c7c97
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rawfile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.SeekableScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlockReader;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.unit.StorageUnit;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+public class DirectRawFileScanner extends FileScanner implements SeekableScanner {
+ private static final Log LOG = LogFactory.getLog(DirectRawFileScanner.class);
+
+ private FileChannel channel;
+ private TajoDataTypes.DataType[] columnTypes;
+ private Path path;
+
+ private boolean eof = false;
+ private long fileSize;
+ private FileInputStream fis;
+ private long recordCount;
+
+ private ZeroCopyTuple unSafeTuple = new ZeroCopyTuple();
+ private OffHeapRowBlock tupleBuffer;
+ private OffHeapRowBlockReader reader = new OffHeapRowBlockReader(tupleBuffer);
+
+ public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+ super(conf, schema, meta, null);
+ this.path = path;
+ }
+
+ @SuppressWarnings("unused")
+ public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+ this(conf, schema, meta, fragment.getPath());
+ }
+
+ public void init() throws IOException {
+ File file;
+ try {
+ if (path.toUri().getScheme() != null) {
+ file = new File(path.toUri());
+ } else {
+ file = new File(path.toString());
+ }
+ } catch (IllegalArgumentException iae) {
+ throw new IOException(iae);
+ }
+ fis = new FileInputStream(file);
+ channel = fis.getChannel();
+ fileSize = channel.size();
+
+ if (tableStats != null) {
+ tableStats.setNumBytes(fileSize);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size());
+ }
+
+ columnTypes = new TajoDataTypes.DataType[schema.size()];
+ for (int i = 0; i < schema.size(); i++) {
+ columnTypes[i] = schema.getColumn(i).getDataType();
+ }
+
+ tupleBuffer = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+ reader = new OffHeapRowBlockReader(tupleBuffer);
+
+ fetchNeeded = !next(tupleBuffer);
+
+ super.init();
+ }
+
+ @Override
+ public long getNextOffset() throws IOException {
+ return channel.position() - reader.remainForRead();
+ }
+
+ @Override
+ public void seek(long offset) throws IOException {
+ channel.position(offset);
+ fetchNeeded = true;
+ }
+
+ public boolean next(OffHeapRowBlock rowblock) throws IOException {
+ return rowblock.copyFromChannel(channel, tableStats);
+ }
+
+ private boolean fetchNeeded = true;
+
+ @Override
+ public Tuple next() throws IOException {
+ if(eof) {
+ return null;
+ }
+
+ while(true) {
+ if (fetchNeeded) {
+ if (!next(tupleBuffer)) {
+ return null;
+ }
+ reader.reset();
+ }
+
+ fetchNeeded = !reader.next(unSafeTuple);
+
+ if (!fetchNeeded) {
+ recordCount++;
+ return unSafeTuple;
+ }
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ // reload initial buffer
+ fetchNeeded = true;
+ channel.position(0);
+ eof = false;
+ reader.reset();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (tableStats != null) {
+ tableStats.setReadBytes(fileSize);
+ tableStats.setNumRows(recordCount);
+ }
+ tupleBuffer.release();
+ tupleBuffer = null;
+ reader = null;
+ IOUtils.cleanup(LOG, channel, fis);
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return false;
+ }
+
+ @Override
+ public float getProgress() {
+ try {
+ tableStats.setNumRows(recordCount);
+ long filePos = 0;
+ if (channel != null) {
+ filePos = channel.position();
+ tableStats.setReadBytes(filePos);
+ }
+
+ if(eof || channel == null) {
+ tableStats.setReadBytes(fileSize);
+ return 1.0f;
+ }
+
+ if (filePos == 0) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, ((float)filePos / (float)fileSize));
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ return 0.0f;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
new file mode 100644
index 0000000..1108163
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rawfile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.storage.FileAppender;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.BaseTupleBuilder;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.UnSafeTuple;
+import org.apache.tajo.unit.StorageUnit;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+public class DirectRawFileWriter extends FileAppender {
+ public static final String FILE_EXTENSION = "draw";
+ private static final Log LOG = LogFactory.getLog(DirectRawFileWriter.class);
+
+ private FileChannel channel;
+ private RandomAccessFile randomAccessFile;
+ private TajoDataTypes.DataType[] columnTypes;
+ private long pos;
+
+ private TableStatistics stats;
+
+ private BaseTupleBuilder builder;
+
+ public DirectRawFileWriter(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+ super(conf, schema, meta, path);
+ }
+
+ public void init() throws IOException {
+ File file;
+ try {
+ if (path.toUri().getScheme() != null) {
+ file = new File(path.toUri());
+ } else {
+ file = new File(path.toString());
+ }
+ } catch (IllegalArgumentException iae) {
+ throw new IOException(iae);
+ }
+
+ randomAccessFile = new RandomAccessFile(file, "rw");
+ channel = randomAccessFile.getChannel();
+ pos = 0;
+
+ columnTypes = new TajoDataTypes.DataType[schema.size()];
+ for (int i = 0; i < schema.size(); i++) {
+ columnTypes[i] = schema.getColumn(i).getDataType();
+ }
+
+ if (enabledStats) {
+ this.stats = new TableStatistics(this.schema);
+ }
+
+ builder = new BaseTupleBuilder(schema);
+
+ super.init();
+ }
+
+ @Override
+ public long getOffset() throws IOException {
+ return pos;
+ }
+
+ public void writeRowBlock(OffHeapRowBlock rowBlock) throws IOException {
+ channel.write(rowBlock.nioBuffer());
+ if (enabledStats) {
+ stats.incrementRows(rowBlock.rows());
+ }
+
+ pos = channel.position();
+ }
+
+ private ByteBuffer buffer;
+ private void ensureSize(int size) throws IOException {
+ if (buffer.remaining() < size) {
+
+ buffer.limit(buffer.position());
+ buffer.flip();
+ channel.write(buffer);
+
+ buffer.clear();
+ }
+ }
+
+ @Override
+ public void addTuple(Tuple t) throws IOException {
+ if (enabledStats) {
+ for (int i = 0; i < schema.size(); i++) {
+ stats.analyzeField(i, t.get(i));
+ }
+ }
+
+ if (buffer == null) {
+ buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB);
+ }
+
+ UnSafeTuple unSafeTuple;
+
+ if (!(t instanceof UnSafeTuple)) {
+ RowStoreUtil.convert(t, builder);
+ unSafeTuple = builder.buildToZeroCopyTuple();
+ } else {
+ unSafeTuple = (UnSafeTuple) t;
+ }
+
+ ByteBuffer bb = unSafeTuple.nioBuffer();
+ ensureSize(bb.limit());
+ buffer.put(bb);
+
+ pos = channel.position() + (buffer.limit() - buffer.remaining());
+
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (buffer != null) {
+ buffer.limit(buffer.position());
+ buffer.flip();
+ channel.write(buffer);
+ buffer.clear();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
+ }
+
+ IOUtils.cleanup(LOG, channel, randomAccessFile);
+ }
+
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ stats.setNumBytes(pos);
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
new file mode 100644
index 0000000..ec08250
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
@@ -0,0 +1,110 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.*;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
+ private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class);
+
+ private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+ // buffer
+ private ByteBuffer buffer;
+ private long address;
+
+ public BaseTupleBuilder(Schema schema) {
+ super(SchemaUtil.toDataTypes(schema));
+ buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder());
+ address = UnsafeUtil.getAddress(buffer);
+ }
+
+ @Override
+ public long address() {
+ return address;
+ }
+
+ public void ensureSize(int size) {
+ if (buffer.remaining() - size < 0) { // check the remain size
+ // enlarge new buffer and copy writing data
+ int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2);
+ ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
+ long newAddress = ((DirectBuffer)newByteBuf).address();
+ UNSAFE.copyMemory(this.address, newAddress, buffer.limit());
+ LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false));
+
+ // release existing buffer and replace variables
+ UnsafeUtil.free(buffer);
+ buffer = newByteBuf;
+ address = newAddress;
+ }
+ }
+
+ @Override
+ public int position() {
+ return 0;
+ }
+
+ @Override
+ public void forward(int length) {
+ }
+
+ @Override
+ public void endRow() {
+ super.endRow();
+ buffer.position(0).limit(offset());
+ }
+
+ @Override
+ public Tuple build() {
+ return buildToHeapTuple();
+ }
+
+ public HeapTuple buildToHeapTuple() {
+ byte [] bytes = new byte[buffer.limit()];
+ UNSAFE.copyMemory(null, address, bytes, Unsafe.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit());
+ return new HeapTuple(bytes, dataTypes());
+ }
+
+ public ZeroCopyTuple buildToZeroCopyTuple() {
+ ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+ zcTuple.set(buffer, 0, buffer.limit(), dataTypes());
+ return zcTuple;
+ }
+
+ public void release() {
+ UnsafeUtil.free(buffer);
+ buffer = null;
+ address = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
new file mode 100644
index 0000000..be734e1
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+
+public interface RowBlockReader<T extends Tuple> {
+
+ /**
+ * Return for each tuple
+ *
+ * @return True if tuple block is filled with tuples. Otherwise, It will return false.
+ */
+ public boolean next(T tuple);
+
+ public void reset();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
new file mode 100644
index 0000000..c43c018
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
@@ -0,0 +1,26 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.RowWriter;
+
+public interface TupleBuilder extends RowWriter {
+ public Tuple build();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
new file mode 100644
index 0000000..9662d5a
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.UnsafeUtil;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
+ private ByteBuffer bb;
+
+ public DirectBufTuple(int length, DataType[] types) {
+ bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder());
+ set(bb, 0, length, types);
+ }
+
+ @Override
+ public void release() {
+ UnsafeUtil.free(bb);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
new file mode 100644
index 0000000..a327123
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+/**
+ * Fixed size limit specification
+ */
+public class FixedSizeLimitSpec extends ResizableLimitSpec {
+ public FixedSizeLimitSpec(long size) {
+ super(size, size);
+ }
+
+ public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
+ super(size, size, allowedOverflowRatio);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
new file mode 100644
index 0000000..7c7d8a1
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
@@ -0,0 +1,269 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.sun.tools.javac.util.Convert;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class HeapTuple implements Tuple {
+ private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+ private static final long BASE_OFFSET = Unsafe.ARRAY_BYTE_BASE_OFFSET;
+
+ private final byte [] data;
+ private final DataType [] types;
+
+ public HeapTuple(final byte [] bytes, final DataType [] types) {
+ this.data = bytes;
+ this.types = types;
+ }
+
+ @Override
+ public int size() {
+ return data.length;
+ }
+
+ public ByteBuffer nioBuffer() {
+ return ByteBuffer.wrap(data);
+ }
+
+ private int getFieldOffset(int fieldId) {
+ return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+ }
+
+ private int checkNullAndGetOffset(int fieldId) {
+ int offset = getFieldOffset(fieldId);
+ if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
+ throw new RuntimeException("Invalid Field Access: " + fieldId);
+ }
+ return offset;
+ }
+
+ @Override
+ public boolean contains(int fieldid) {
+ return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public boolean isNull(int fieldid) {
+ return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public boolean isNotNull(int fieldid) {
+ return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+ }
+
+ @Override
+ public void clear() {
+ // nothing to do
+ }
+
+ @Override
+ public void put(int fieldId, Datum value) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+ }
+
+ @Override
+ public void put(int fieldId, Datum[] values) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+ }
+
+ @Override
+ public void put(Datum[] values) {
+ throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+ }
+
+ @Override
+ public Datum get(int fieldId) {
+ if (isNull(fieldId)) {
+ return NullDatum.get();
+ }
+
+ switch (types[fieldId].getType()) {
+ case BOOLEAN:
+ return DatumFactory.createBool(getBool(fieldId));
+ case INT1:
+ case INT2:
+ return DatumFactory.createInt2(getInt2(fieldId));
+ case INT4:
+ return DatumFactory.createInt4(getInt4(fieldId));
+ case INT8:
+ return DatumFactory.createInt8(getInt4(fieldId));
+ case FLOAT4:
+ return DatumFactory.createFloat4(getFloat4(fieldId));
+ case FLOAT8:
+ return DatumFactory.createFloat8(getFloat8(fieldId));
+ case TEXT:
+ return DatumFactory.createText(getText(fieldId));
+ case TIMESTAMP:
+ return DatumFactory.createTimestamp(getInt8(fieldId));
+ case DATE:
+ return DatumFactory.createDate(getInt4(fieldId));
+ case TIME:
+ return DatumFactory.createTime(getInt8(fieldId));
+ case INTERVAL:
+ return getInterval(fieldId);
+ case INET4:
+ return DatumFactory.createInet4(getInt4(fieldId));
+ case PROTOBUF:
+ return getProtobufDatum(fieldId);
+ default:
+ throw new UnsupportedException("Unknown type: " + types[fieldId]);
+ }
+ }
+
+ @Override
+ public void setOffset(long offset) {
+ }
+
+ @Override
+ public long getOffset() {
+ return 0;
+ }
+
+ @Override
+ public boolean getBool(int fieldId) {
+ return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01;
+ }
+
+ @Override
+ public byte getByte(int fieldId) {
+ return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public char getChar(int fieldId) {
+ return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public byte[] getBytes(int fieldId) {
+ long pos = checkNullAndGetOffset(fieldId);
+ int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len);
+ return bytes;
+ }
+
+ @Override
+ public short getInt2(int fieldId) {
+ return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public int getInt4(int fieldId) {
+ return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public long getInt8(int fieldId) {
+ return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public float getFloat4(int fieldId) {
+ return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public double getFloat8(int fieldId) {
+ return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+ }
+
+ @Override
+ public String getText(int fieldId) {
+ return new String(getBytes(fieldId));
+ }
+
+ public IntervalDatum getInterval(int fieldId) {
+ long pos = checkNullAndGetOffset(fieldId);
+ int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
+ pos += SizeOf.SIZE_OF_INT;
+ long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos);
+ return new IntervalDatum(months, millisecs);
+ }
+
+ @Override
+ public Datum getProtobufDatum(int fieldId) {
+ byte [] bytes = getBytes(fieldId);
+
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+ Message.Builder builder = factory.newBuilder();
+ try {
+ builder.mergeFrom(bytes);
+ } catch (InvalidProtocolBufferException e) {
+ return NullDatum.get();
+ }
+
+ return new ProtobufDatum(builder.build());
+ }
+
+ @Override
+ public char[] getUnicodeChars(int fieldId) {
+ long pos = checkNullAndGetOffset(fieldId);
+ int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
+ pos += SizeOf.SIZE_OF_INT;
+
+ byte [] bytes = new byte[len];
+ UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len);
+ return Convert.utf2chars(bytes);
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ return this;
+ }
+
+ @Override
+ public Datum[] getValues() {
+ Datum [] datums = new Datum[size()];
+ for (int i = 0; i < size(); i++) {
+ if (contains(i)) {
+ datums[i] = get(i);
+ } else {
+ datums[i] = NullDatum.get();
+ }
+ }
+ return datums;
+ }
+
+ @Override
+ public String toString() {
+ return VTuple.toDisplayString(getValues());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
new file mode 100644
index 0000000..2f8e349
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class OffHeapMemory implements Deallocatable {
+ private static final Log LOG = LogFactory.getLog(OffHeapMemory.class);
+
+ protected static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+ protected ByteBuffer buffer;
+ protected int memorySize;
+ protected ResizableLimitSpec limitSpec;
+ protected long address;
+
+ @VisibleForTesting
+ protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) {
+ this.buffer = buffer;
+ this.address = ((DirectBuffer) buffer).address();
+ this.memorySize = buffer.limit();
+ this.limitSpec = limitSpec;
+ }
+
+ public OffHeapMemory(ResizableLimitSpec limitSpec) {
+ this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec);
+ }
+
+ public long address() {
+ return address;
+ }
+
+ public long size() {
+ return memorySize;
+ }
+
+ public void resize(int newSize) {
+ Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
+
+ if (newSize > limitSpec.limit()) {
+ throw new RuntimeException("Resize cannot exceed the size limit");
+ }
+
+ if (newSize < memorySize) {
+ LOG.warn("The size reduction is ignored.");
+ }
+
+ int newBlockSize = UnsafeUtil.alignedSize(newSize);
+ ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
+ long newAddress = ((DirectBuffer)newByteBuf).address();
+
+ UNSAFE.copyMemory(this.address, newAddress, memorySize);
+
+ UnsafeUtil.free(buffer);
+ this.memorySize = newSize;
+ this.buffer = newByteBuf;
+ this.address = newAddress;
+ }
+
+ public java.nio.Buffer nioBuffer() {
+ return (ByteBuffer) buffer.position(0).limit(memorySize);
+ }
+
+ @Override
+ public void release() {
+ UnsafeUtil.free(this.buffer);
+ this.buffer = null;
+ this.address = 0;
+ this.memorySize = 0;
+ }
+
+ public String toString() {
+ return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
new file mode 100644
index 0000000..689efb7
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
@@ -0,0 +1,176 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.SizeOf;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
+ private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class);
+
+ public static final int NULL_FIELD_OFFSET = -1;
+
+ DataType [] dataTypes;
+
+ // Basic States
+ private int maxRowNum = Integer.MAX_VALUE; // optional
+ private int rowNum;
+ protected int position = 0;
+
+ private OffHeapRowBlockWriter builder;
+
+ private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) {
+ super(buffer, limitSpec);
+ initialize(schema);
+ }
+
+ public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) {
+ super(limitSpec);
+ initialize(schema);
+ }
+
+ private void initialize(Schema schema) {
+ dataTypes = SchemaUtil.toDataTypes(schema);
+
+ this.builder = new OffHeapRowBlockWriter(this);
+ }
+
+ @VisibleForTesting
+ public OffHeapRowBlock(Schema schema, int bytes) {
+ this(schema, new ResizableLimitSpec(bytes));
+ }
+
+ @VisibleForTesting
+ public OffHeapRowBlock(Schema schema, ByteBuffer buffer) {
+ this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT);
+ }
+
+ public void position(int pos) {
+ this.position = pos;
+ }
+
+ public void clear() {
+ this.position = 0;
+ this.rowNum = 0;
+
+ builder.clear();
+ }
+
+ @Override
+ public ByteBuffer nioBuffer() {
+ return (ByteBuffer) buffer.position(0).limit(position);
+ }
+
+ public int position() {
+ return position;
+ }
+
+ public long usedMem() {
+ return position;
+ }
+
+ /**
+ * Ensure that this buffer has enough remaining space to add the size.
+ * Creates and copies to a new buffer if necessary
+ *
+ * @param size Size to add
+ */
+ public void ensureSize(int size) {
+ if (remain() - size < 0) {
+ if (!limitSpec.canIncrease(memorySize)) {
+ throw new RuntimeException("Cannot increase RowBlock anymore.");
+ }
+
+ int newBlockSize = limitSpec.increasedSize(memorySize);
+ resize(newBlockSize);
+ LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
+ }
+ }
+
+ public long remain() {
+ return memorySize - position - builder.offset();
+ }
+
+ public int maxRowNum() {
+ return maxRowNum;
+ }
+ public int rows() {
+ return rowNum;
+ }
+
+ public void setRows(int rowNum) {
+ this.rowNum = rowNum;
+ }
+
+ public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException {
+ if (channel.position() < channel.size()) {
+ clear();
+
+ buffer.clear();
+ channel.read(buffer);
+ memorySize = buffer.position();
+
+ while (position < memorySize) {
+ long recordPtr = address + position;
+
+ if (remain() < SizeOf.SIZE_OF_INT) {
+ channel.position(channel.position() - remain());
+ memorySize = (int) (memorySize - remain());
+ return true;
+ }
+
+ int recordSize = UNSAFE.getInt(recordPtr);
+
+ if (remain() < recordSize) {
+ channel.position(channel.position() - remain());
+ memorySize = (int) (memorySize - remain());
+ return true;
+ }
+
+ position += recordSize;
+ rowNum++;
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public RowWriter getWriter() {
+ return builder;
+ }
+
+ public OffHeapRowBlockReader getReader() {
+ return new OffHeapRowBlockReader(this);
+ }
+}