You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:44 UTC
[07/18] TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
new file mode 100644
index 0000000..41c9d61
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+
+/**
+ * This class is not thread-safe.
+ */
+public class TableStatistics {
+ private Schema schema;
+ private Tuple minValues;
+ private Tuple maxValues;
+ private long [] numNulls;
+ private long numRows = 0;
+ private long numBytes = 0;
+
+
+ private boolean [] comparable;
+
+ public TableStatistics(Schema schema) {
+ this.schema = schema;
+ minValues = new VTuple(schema.getColumnNum());
+ maxValues = new VTuple(schema.getColumnNum());
+
+ numNulls = new long[schema.getColumnNum()];
+ comparable = new boolean[schema.getColumnNum()];
+
+ DataType type;
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ type = schema.getColumn(i).getDataType();
+ if (type.getType() == Type.PROTOBUF) {
+ comparable[i] = false;
+ } else {
+ comparable[i] = true;
+ }
+ }
+ }
+
+ public Schema getSchema() {
+ return this.schema;
+ }
+
+ public void incrementRow() {
+ numRows++;
+ }
+
+ public long getNumRows() {
+ return this.numRows;
+ }
+
+ public void setNumBytes(long bytes) {
+ this.numBytes = bytes;
+ }
+
+ public long getNumBytes() {
+ return this.numBytes;
+ }
+
+ public void analyzeField(int idx, Datum datum) {
+ if (datum instanceof NullDatum) {
+ numNulls[idx]++;
+ return;
+ }
+
+ if (comparable[idx]) {
+ if (!maxValues.contains(idx) ||
+ maxValues.get(idx).compareTo(datum) < 0) {
+ maxValues.put(idx, datum);
+ }
+ if (!minValues.contains(idx) ||
+ minValues.get(idx).compareTo(datum) > 0) {
+ minValues.put(idx, datum);
+ }
+ }
+ }
+
+ public TableStats getTableStat() {
+ TableStats stat = new TableStats();
+
+ ColumnStats columnStats;
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ columnStats = new ColumnStats(schema.getColumn(i));
+ columnStats.setNumNulls(numNulls[i]);
+ columnStats.setMinValue(minValues.get(i));
+ columnStats.setMaxValue(maxValues.get(i));
+ stat.addColumnStat(columnStats);
+ }
+
+ stat.setNumRows(this.numRows);
+ stat.setNumBytes(this.numBytes);
+
+ return stat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
new file mode 100644
index 0000000..07ea79b
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.protobuf.Message;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+//Compatibility with Apache Hive
+public class TextSerializerDeserializer implements SerializerDeserializer {
+ public static final byte[] trueBytes = "true".getBytes();
+ public static final byte[] falseBytes = "false".getBytes();
+ private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+
+
+ @Override
+ public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException {
+
+ byte[] bytes;
+ int length = 0;
+ TajoDataTypes.DataType dataType = col.getDataType();
+
+ if (datum == null || datum instanceof NullDatum) {
+ switch (dataType.getType()) {
+ case CHAR:
+ case TEXT:
+ length = nullCharacters.length;
+ out.write(nullCharacters);
+ break;
+ default:
+ break;
+ }
+ return length;
+ }
+
+ switch (dataType.getType()) {
+ case BOOLEAN:
+ out.write(datum.asBool() ? trueBytes : falseBytes);
+ length = trueBytes.length;
+ break;
+ case CHAR:
+ byte[] pad = new byte[dataType.getLength() - datum.size()];
+ bytes = datum.asTextBytes();
+ out.write(bytes);
+ out.write(pad);
+ length = bytes.length + pad.length;
+ break;
+ case TEXT:
+ case BIT:
+ case INT2:
+ case INT4:
+ case INT8:
+ case FLOAT4:
+ case FLOAT8:
+ case INET4:
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ bytes = datum.asTextBytes();
+ length = bytes.length;
+ out.write(bytes);
+ break;
+ case INET6:
+ case BLOB:
+ bytes = Base64.encodeBase64(datum.asByteArray(), false);
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case PROTOBUF:
+ ProtobufDatum protobuf = (ProtobufDatum) datum;
+ byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
+ length = protoBytes.length;
+ out.write(protoBytes, 0, protoBytes.length);
+ break;
+ case NULL_TYPE:
+ default:
+ break;
+ }
+ return length;
+ }
+
+ @Override
+ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+
+ Datum datum;
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T');
+ break;
+ case BIT:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, length)));
+ break;
+ case CHAR:
+ datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createChar(new String(bytes, offset, length).trim());
+ break;
+ case INT2:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createInt2(new String(bytes, offset, length));
+ break;
+ case INT4:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createInt4(new String(bytes, offset, length));
+ break;
+ case INT8:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createInt8(new String(bytes, offset, length));
+ break;
+ case FLOAT4:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createFloat4(new String(bytes, offset, length));
+ break;
+ case FLOAT8:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createFloat8(new String(bytes, offset, length));
+ break;
+ case TEXT: {
+ byte[] chars = new byte[length];
+ System.arraycopy(bytes, offset, chars, 0, length);
+ datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createText(chars);
+ break;
+ }
+ case DATE:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createDate(new String(bytes, offset, length));
+ break;
+ case TIME:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createTime(new String(bytes, offset, length));
+ break;
+ case TIMESTAMP:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createTimeStamp(new String(bytes, offset, length));
+ break;
+ case PROTOBUF: {
+ if (isNull(bytes, offset, length, nullCharacters)) {
+ datum = NullDatum.get();
+ } else {
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
+ Message.Builder builder = factory.newBuilder();
+ try {
+ byte[] protoBytes = new byte[length];
+ System.arraycopy(bytes, offset, protoBytes, 0, length);
+ protobufJsonFormat.merge(protoBytes, builder);
+ datum = factory.createDatum(builder.build());
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ break;
+ }
+ case INET4:
+ datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+ : DatumFactory.createInet4(new String(bytes, offset, length));
+ break;
+ case BLOB: {
+ if (isNull(bytes, offset, length, nullCharacters)) {
+ datum = NullDatum.get();
+ } else {
+ byte[] blob = new byte[length];
+ System.arraycopy(bytes, offset, blob, 0, length);
+ datum = DatumFactory.createBlob(Base64.decodeBase64(blob));
+ }
+ break;
+ }
+ default:
+ datum = NullDatum.get();
+ break;
+ }
+ return datum;
+ }
+
+ private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) {
+ return length == 0 || ((length == nullBytes.length)
+ && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length));
+ }
+
+ private static boolean isNullText(byte[] val, int offset, int length, byte[] nullBytes) {
+ return length > 0 && length == nullBytes.length
+ && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
new file mode 100644
index 0000000..ba35988
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.datum.*;
+
+import java.net.InetAddress;
+
+public interface Tuple extends Cloneable {
+
+ public int size();
+
+ public boolean contains(int fieldid);
+
+ public boolean isNull(int fieldid);
+
+ public void clear();
+
+ public void put(int fieldId, Datum value);
+
+ public void put(int fieldId, Datum [] values);
+
+ public void put(int fieldId, Tuple tuple);
+
+ public void put(Datum [] values);
+
+ public Datum get(int fieldId);
+
+ public void setOffset(long offset);
+
+ public long getOffset();
+
+ public BooleanDatum getBoolean(int fieldId);
+
+ public BitDatum getByte(int fieldId);
+
+ public CharDatum getChar(int fieldId);
+
+ public BlobDatum getBytes(int fieldId);
+
+ public Int2Datum getShort(int fieldId);
+
+ public Int4Datum getInt(int fieldId);
+
+ public Int8Datum getLong(int fieldId);
+
+ public Float4Datum getFloat(int fieldId);
+
+ public Float8Datum getDouble(int fieldId);
+
+ public Inet4Datum getIPv4(int fieldId);
+
+ public byte [] getIPv4Bytes(int fieldId);
+
+ public InetAddress getIPv6(int fieldId);
+
+ public byte [] getIPv6Bytes(int fieldId);
+
+ public TextDatum getString(int fieldId);
+
+ public TextDatum getText(int fieldId);
+
+ public Tuple clone() throws CloneNotSupportedException;
+
+ public Datum[] getValues();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
new file mode 100644
index 0000000..69c1c04
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+
+import java.util.Comparator;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto;
+import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
+
+/**
+ * The Comparator class for Tuples
+ *
+ * @see Tuple
+ */
+public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> {
+ private final int[] sortKeyIds;
+ private final boolean[] asc;
+ @SuppressWarnings("unused")
+ private final boolean[] nullFirsts;
+
+ private Datum left;
+ private Datum right;
+ private int compVal;
+
+ public TupleComparator(Schema schema, SortSpec[] sortKeys) {
+ Preconditions.checkArgument(sortKeys.length > 0,
+ "At least one sort key must be specified.");
+
+ this.sortKeyIds = new int[sortKeys.length];
+ this.asc = new boolean[sortKeys.length];
+ this.nullFirsts = new boolean[sortKeys.length];
+ for (int i = 0; i < sortKeys.length; i++) {
+ this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
+
+ this.asc[i] = sortKeys[i].isAscending();
+ this.nullFirsts[i]= sortKeys[i].isNullFirst();
+ }
+ }
+
+ public TupleComparator(TupleComparatorProto proto) {
+ this.sortKeyIds = new int[proto.getCompSpecsCount()];
+ this.asc = new boolean[proto.getCompSpecsCount()];
+ this.nullFirsts = new boolean[proto.getCompSpecsCount()];
+
+ for (int i = 0; i < proto.getCompSpecsCount(); i++) {
+ TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i);
+ sortKeyIds[i] = sortSepcProto.getColumnId();
+ asc[i] = sortSepcProto.getAscending();
+ nullFirsts[i] = sortSepcProto.getNullFirst();
+ }
+ }
+
+ public boolean isAscendingFirstKey() {
+ return this.asc[0];
+ }
+
+ @Override
+ public int compare(Tuple tuple1, Tuple tuple2) {
+ for (int i = 0; i < sortKeyIds.length; i++) {
+ left = tuple1.get(sortKeyIds[i]);
+ right = tuple2.get(sortKeyIds[i]);
+
+ if (left instanceof NullDatum || right instanceof NullDatum) {
+ if (!left.equals(right)) {
+ if (left instanceof NullDatum) {
+ compVal = 1;
+ } else if (right instanceof NullDatum) {
+ compVal = -1;
+ }
+ if (nullFirsts[i]) {
+ if (compVal != 0) {
+ compVal *= -1;
+ }
+ }
+ } else {
+ compVal = 0;
+ }
+ } else {
+ if (asc[i]) {
+ compVal = left.compareTo(right);
+ } else {
+ compVal = right.compareTo(left);
+ }
+ }
+
+ if (compVal < 0 || compVal > 0) {
+ return compVal;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(sortKeyIds);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof TupleComparator) {
+ TupleComparator other = (TupleComparator) obj;
+ if (sortKeyIds.length != other.sortKeyIds.length) {
+ return false;
+ }
+
+ for (int i = 0; i < sortKeyIds.length; i++) {
+ if (sortKeyIds[i] != other.sortKeyIds[i] ||
+ asc[i] != other.asc[i] ||
+ nullFirsts[i] != other.nullFirsts[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public TupleComparatorProto getProto() {
+ TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder();
+ TupleComparatorSpecProto.Builder sortSpecBuilder;
+
+ for (int i = 0; i < sortKeyIds.length; i++) {
+ sortSpecBuilder = TupleComparatorSpecProto.newBuilder();
+ sortSpecBuilder.setColumnId(sortKeyIds[i]);
+ sortSpecBuilder.setAscending(asc[i]);
+ sortSpecBuilder.setNullFirst(nullFirsts[i]);
+ builder.addCompSpecs(sortSpecBuilder);
+ }
+
+ return builder.build();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
new file mode 100644
index 0000000..7d0f674
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+
+import java.util.Comparator;
+
+public class TupleRange implements Comparable<TupleRange> {
+ private final Schema schema;
+ private final Tuple start;
+ private final Tuple end;
+ private final TupleComparator comp;
+
+ public TupleRange(final Schema schema, final Tuple start, final Tuple end) {
+ this.comp = new TupleComparator(schema, schemaToSortSpecs(schema));
+ // if there is only one value, start == end
+ Preconditions.checkArgument(comp.compare(start, end) <= 0, ("start=" + start) + ", end=" + end);
+ this.schema = schema;
+ this.start = start;
+ this.end = end;
+ }
+
+ public static SortSpec[] schemaToSortSpecs(Schema schema) {
+ SortSpec[] specs = new SortSpec[schema.getColumnNum()];
+
+ for (int i = 0; i < schema.getColumnNum(); i++) {
+ specs[i] = new SortSpec(schema.getColumn(i), true, false);
+ }
+
+ return specs;
+ }
+
+ public final Schema getSchema() {
+ return this.schema;
+ }
+
+ public final Tuple getStart() {
+ return this.start;
+ }
+
+ public final Tuple getEnd() {
+ return this.end;
+ }
+
+ public String toString() {
+ return "[" + this.start + ", " + this.end+")";
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(start, end);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof TupleRange) {
+ TupleRange other = (TupleRange) obj;
+ return this.start.equals(other.start) && this.end.equals(other.end);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int compareTo(TupleRange o) {
+ // TODO - should handle overlap
+ int cmpVal = comp.compare(this.start, o.start);
+ if (cmpVal != 0) {
+ return cmpVal;
+ } else {
+ return comp.compare(this.end, o.end);
+ }
+ }
+
+ public static class DescendingTupleRangeComparator
+ implements Comparator<TupleRange> {
+
+ @Override
+ public int compare(TupleRange left, TupleRange right) {
+ return -(left.compareTo(right));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
new file mode 100644
index 0000000..878c05e
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.exception.InvalidCastException;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+
+public class VTuple implements Tuple, Cloneable {
+ @Expose public Datum [] values;
+ @Expose private long offset;
+
+ public VTuple(int size) {
+ values = new Datum[size];
+ }
+
+ public VTuple(Tuple tuple) {
+ this.values = new Datum[tuple.size()];
+ System.arraycopy(((VTuple)tuple).values, 0, values, 0, tuple.size());
+ this.offset = ((VTuple)tuple).offset;
+ }
+
+ public VTuple(Datum [] datum) {
+ this(datum.length);
+ put(datum);
+ }
+
+ @Override
+ public int size() {
+ return values.length;
+ }
+
+ public boolean contains(int fieldId) {
+ return values[fieldId] != null;
+ }
+
+ @Override
+ public boolean isNull(int fieldid) {
+ return values[fieldid] instanceof NullDatum;
+ }
+
+ @Override
+ public void clear() {
+ for (int i=0; i < values.length; i++) {
+ values[i] = null;
+ }
+ }
+
+ //////////////////////////////////////////////////////
+ // Setter
+ //////////////////////////////////////////////////////
+ public void put(int fieldId, Datum value) {
+ values[fieldId] = value;
+ }
+
+ @Override
+ public void put(int fieldId, Datum[] values) {
+ for (int i = fieldId, j = 0; j < values.length; i++, j++) {
+ values[i] = values[j];
+ }
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
+ values[i] = tuple.get(j);
+ }
+ }
+
+ public void put(Datum [] values) {
+ System.arraycopy(values, 0, this.values, 0, size());
+ }
+
+ //////////////////////////////////////////////////////
+ // Getter
+ //////////////////////////////////////////////////////
+ public Datum get(int fieldId) {
+ return this.values[fieldId];
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public long getOffset() {
+ return this.offset;
+ }
+
+ @Override
+ public BooleanDatum getBoolean(int fieldId) {
+ return (BooleanDatum) values[fieldId];
+ }
+
+ public BitDatum getByte(int fieldId) {
+ return (BitDatum) values[fieldId];
+ }
+
+ public CharDatum getChar(int fieldId) {
+ return (CharDatum) values[fieldId];
+ }
+
+ public BlobDatum getBytes(int fieldId) {
+ return (BlobDatum) values[fieldId];
+ }
+
+ public Int2Datum getShort(int fieldId) {
+ return (Int2Datum) values[fieldId];
+ }
+
+ public Int4Datum getInt(int fieldId) {
+ return (Int4Datum) values[fieldId];
+ }
+
+ public Int8Datum getLong(int fieldId) {
+ return (Int8Datum) values[fieldId];
+ }
+
+ public Float4Datum getFloat(int fieldId) {
+ return (Float4Datum) values[fieldId];
+ }
+
+ public Float8Datum getDouble(int fieldId) {
+ return (Float8Datum) values[fieldId];
+ }
+
+ public Inet4Datum getIPv4(int fieldId) {
+ return (Inet4Datum) values[fieldId];
+ }
+
+ public byte[] getIPv4Bytes(int fieldId) {
+ return values[fieldId].asByteArray();
+ }
+
+ public InetAddress getIPv6(int fieldId) {
+ throw new InvalidCastException("IPv6 is unsupported yet");
+ }
+
+ public byte[] getIPv6Bytes(int fieldId) {
+ throw new InvalidCastException("IPv6 is unsupported yet");
+ }
+
+ public TextDatum getString(int fieldId) {
+ return (TextDatum) values[fieldId];
+ }
+
+ @Override
+ public TextDatum getText(int fieldId) {
+ return (TextDatum) values[fieldId];
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ VTuple tuple = (VTuple) super.clone();
+
+ tuple.values = new Datum[size()];
+ System.arraycopy(values, 0, tuple.values, 0, size()); //shallow copy
+ return tuple;
+ }
+
+ public String toString() {
+ boolean first = true;
+ StringBuilder str = new StringBuilder();
+ str.append("(");
+ for(int i=0; i < values.length; i++) {
+ if(values[i] != null) {
+ if(first) {
+ first = false;
+ } else {
+ str.append(", ");
+ }
+ str.append(i)
+ .append("=>")
+ .append(values[i]);
+ }
+ }
+ str.append(")");
+ return str.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 37;
+ for (int i=0; i < values.length; i++) {
+ if(values[i] != null) {
+ hashCode ^= (values[i].hashCode() * 41);
+ } else {
+ hashCode = hashCode ^ (i + 17);
+ }
+ }
+
+ return hashCode;
+ }
+
+ @Override
+ public Datum[] getValues() {
+ return values;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Tuple) {
+ Tuple other = (Tuple) obj;
+ return Arrays.equals(getValues(), other.getValues());
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java b/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
new file mode 100644
index 0000000..ad19101
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface ForSplitableStore {
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java b/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
new file mode 100644
index 0000000..baeda8c
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.compress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DoNotPool;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A global compressor/decompressor pool used to save and reuse (possibly
+ * native) compression/decompression codecs.
+ */
+public final class CodecPool {
+ private static final Log LOG = LogFactory.getLog(CodecPool.class);
+
+ /**
+ * A global compressor pool used to save the expensive
+ * construction/destruction of (possibly native) decompression codecs.
+ */
+ private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL =
+ new HashMap<Class<Compressor>, List<Compressor>>();
+
+ /**
+ * A global decompressor pool used to save the expensive
+ * construction/destruction of (possibly native) decompression codecs.
+ */
+ private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL =
+ new HashMap<Class<Decompressor>, List<Decompressor>>();
+
+ private static <T> T borrow(Map<Class<T>, List<T>> pool,
+ Class<? extends T> codecClass) {
+ T codec = null;
+
+ // Check if an appropriate codec is available
+ synchronized (pool) {
+ if (pool.containsKey(codecClass)) {
+ List<T> codecList = pool.get(codecClass);
+
+ if (codecList != null) {
+ synchronized (codecList) {
+ if (!codecList.isEmpty()) {
+ codec = codecList.remove(codecList.size() - 1);
+ }
+ }
+ }
+ }
+ }
+
+ return codec;
+ }
+
+ private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
+ if (codec != null) {
+ Class<T> codecClass = (Class<T>) codec.getClass();
+ synchronized (pool) {
+ if (!pool.containsKey(codecClass)) {
+ pool.put(codecClass, new ArrayList<T>());
+ }
+
+ List<T> codecList = pool.get(codecClass);
+ synchronized (codecList) {
+ codecList.add(codec);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get a {@link Compressor} for the given {@link CompressionCodec} from the
+ * pool or a new one.
+ *
+ * @param codec
+ * the <code>CompressionCodec</code> for which to get the
+ * <code>Compressor</code>
+ * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
+ * @return <code>Compressor</code> for the given <code>CompressionCodec</code>
+ * from the pool or a new one
+ */
+ public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
+ Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
+ if (compressor == null) {
+ compressor = codec.createCompressor();
+ LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
+ } else {
+ compressor.reinit(conf);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Got recycled compressor");
+ }
+ }
+ return compressor;
+ }
+
+ public static Compressor getCompressor(CompressionCodec codec) {
+ return getCompressor(codec, null);
+ }
+
+ /**
+ * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
+ * pool or a new one.
+ *
+ * @param codec
+ * the <code>CompressionCodec</code> for which to get the
+ * <code>Decompressor</code>
+ * @return <code>Decompressor</code> for the given
+ * <code>CompressionCodec</code> the pool or a new one
+ */
+ public static Decompressor getDecompressor(CompressionCodec codec) {
+ Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec
+ .getDecompressorType());
+ if (decompressor == null) {
+ decompressor = codec.createDecompressor();
+ LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Got recycled decompressor");
+ }
+ }
+ return decompressor;
+ }
+
+ /**
+ * Return the {@link Compressor} to the pool.
+ *
+ * @param compressor
+ * the <code>Compressor</code> to be returned to the pool
+ */
+ public static void returnCompressor(Compressor compressor) {
+ if (compressor == null) {
+ return;
+ }
+ // if the compressor can't be reused, don't pool it.
+ if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+ return;
+ }
+ compressor.reset();
+ payback(COMPRESSOR_POOL, compressor);
+ }
+
+ /**
+ * Return the {@link Decompressor} to the pool.
+ *
+ * @param decompressor
+ * the <code>Decompressor</code> to be returned to the pool
+ */
+ public static void returnDecompressor(Decompressor decompressor) {
+ if (decompressor == null) {
+ return;
+ }
+ // if the decompressor can't be reused, don't pool it.
+ if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+ return;
+ }
+ decompressor.reset();
+ payback(DECOMPRESSOR_POOL, decompressor);
+ }
+
+ private CodecPool() {
+ // prevent instantiation
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
new file mode 100644
index 0000000..bb035a8
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.storage.exception;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public class AlreadyExistsStorageException extends IOException {
+ private static final long serialVersionUID = 965518916144019032L;
+
+
+ public AlreadyExistsStorageException(String path) {
+ super("Error: "+path+" alreay exists");
+ }
+
+ public AlreadyExistsStorageException(Path path) {
+ this(path.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
new file mode 100644
index 0000000..a67d1f7
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.exception;
+
+public class UnknownCodecException extends Exception {
+
+ private static final long serialVersionUID = 4287230843540404529L;
+
+ public UnknownCodecException() {
+
+ }
+
+ public UnknownCodecException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
new file mode 100644
index 0000000..d18b5a0
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.exception;
+
+public class UnknownDataTypeException extends Exception {
+
+ private static final long serialVersionUID = -2630390595968966164L;
+
+ public UnknownDataTypeException() {
+
+ }
+
+ public UnknownDataTypeException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
new file mode 100644
index 0000000..8b197d6
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.storage.exception;
+
+public class UnsupportedFileTypeException extends RuntimeException {
+ private static final long serialVersionUID = -8160289695849000342L;
+
+ public UnsupportedFileTypeException() {
+ }
+
+ /**
+ * @param message
+ */
+ public UnsupportedFileTypeException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
new file mode 100644
index 0000000..ea8bf9f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FileFragmentProto;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable {
+ @Expose private String tableName; // required
+ @Expose private Path uri; // required
+ @Expose private Long startOffset; // required
+ @Expose private Long length; // required
+
+ private String[] hosts; // Datanode hostnames
+ @Expose private int[] diskIds;
+
+ public FileFragment(ByteString raw) throws InvalidProtocolBufferException {
+ FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
+ builder.mergeFrom(raw);
+ builder.build();
+ init(builder.build());
+ }
+
+ public FileFragment(String tableName, Path uri, BlockLocation blockLocation, int[] diskIds)
+ throws IOException {
+ this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(),
+ blockLocation.getHosts(), diskIds);
+ }
+
+ // Non splittable
+ public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) {
+ this.set(tableName, uri, start, length, null, null);
+ this.hosts = hosts;
+ }
+
+ public FileFragment(String fragmentId, Path path, long start, long length) {
+ this.set(fragmentId, path, start, length, null, null);
+ }
+
+ public FileFragment(FileFragmentProto proto) {
+ init(proto);
+ }
+
+ private void init(FileFragmentProto proto) {
+ int[] diskIds = new int[proto.getDiskIdsList().size()];
+ int i = 0;
+ for(Integer eachValue: proto.getDiskIdsList()) {
+ diskIds[i++] = eachValue;
+ }
+ this.set(proto.getId(), new Path(proto.getPath()),
+ proto.getStartOffset(), proto.getLength(),
+ proto.getHostsList().toArray(new String[]{}),
+ diskIds);
+ }
+
+ private void set(String tableName, Path path, long start,
+ long length, String[] hosts, int[] diskIds) {
+ this.tableName = tableName;
+ this.uri = path;
+ this.startOffset = start;
+ this.length = length;
+ this.hosts = hosts;
+ this.diskIds = diskIds;
+ }
+
+
+ /**
+ * Get the list of hosts (hostname) hosting this block
+ */
+ public String[] getHosts() {
+ if (hosts == null) {
+ this.hosts = new String[0];
+ }
+ return hosts;
+ }
+
+ /**
+ * Get the list of Disk Ids
+ * Unknown disk is -1. Others 0 ~ N
+ */
+ public int[] getDiskIds() {
+ if (diskIds == null) {
+ this.diskIds = new int[getHosts().length];
+ Arrays.fill(this.diskIds, -1);
+ }
+ return diskIds;
+ }
+
+ public String getTableName() {
+ return this.tableName;
+ }
+
+ public Path getPath() {
+ return this.uri;
+ }
+
+ public void setPath(Path path) {
+ this.uri = path;
+ }
+
+ public Long getStartKey() {
+ return this.startOffset;
+ }
+
+ public Long getEndKey() {
+ return this.length;
+ }
+
+ /**
+ *
+ * The offset range of tablets <b>MUST NOT</b> be overlapped.
+ *
+ * @param t
+ * @return If the table paths are not same, return -1.
+ */
+ @Override
+ public int compareTo(FileFragment t) {
+ if (getPath().equals(t.getPath())) {
+ long diff = this.getStartKey() - t.getStartKey();
+ if (diff < 0) {
+ return -1;
+ } else if (diff > 0) {
+ return 1;
+ } else {
+ return 0;
+ }
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof FileFragment) {
+ FileFragment t = (FileFragment) o;
+ if (getPath().equals(t.getPath())
+ && TUtil.checkEquals(t.getStartKey(), this.getStartKey())
+ && TUtil.checkEquals(t.getEndKey(), this.getEndKey())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(tableName, uri, startOffset, length);
+ }
+
+ public Object clone() throws CloneNotSupportedException {
+ FileFragment frag = (FileFragment) super.clone();
+ frag.tableName = tableName;
+ frag.uri = uri;
+ frag.diskIds = diskIds;
+ frag.hosts = hosts;
+
+ return frag;
+ }
+
+ @Override
+ public String toString() {
+ return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
+ +getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": "
+ + getEndKey() + "}" ;
+ }
+
+ public FragmentProto getProto() {
+ FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
+ builder.setId(this.tableName);
+ builder.setStartOffset(this.startOffset);
+ builder.setLength(this.length);
+ builder.setPath(this.uri.toString());
+ if(diskIds != null) {
+ List<Integer> idList = new ArrayList<Integer>();
+ for(int eachId: diskIds) {
+ idList.add(eachId);
+ }
+ builder.addAllDiskIds(idList);
+ }
+
+ if(hosts != null) {
+ builder.addAllHosts(TUtil.newList(hosts));
+ }
+
+ FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
+ fragmentBuilder.setId(this.tableName);
+ fragmentBuilder.setContents(builder.buildPartial().toByteString());
+ return fragmentBuilder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
new file mode 100644
index 0000000..3f9c160
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import org.apache.tajo.common.ProtoObject;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public interface Fragment extends ProtoObject<FragmentProto> {
+
+ public abstract String getTableName();
+
+ @Override
+ public abstract FragmentProto getProto();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
new file mode 100644
index 0000000..3bfe96f
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.annotation.ThreadSafe;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+
+@ThreadSafe
+public class FragmentConvertor {
+ /**
+ * Cache of fragment classes
+ */
+ protected static final Map<String, Class<? extends Fragment>> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap();
+ /**
+ * Cache of constructors for each class.
+ */
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+ /**
+ * default parameter for all constructors
+ */
+ private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class };
+
+ public static Class<? extends Fragment> getFragmentClass(Configuration conf, StoreType storeType)
+ throws IOException {
+ String handlerName = storeType.name().toLowerCase();
+ Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(handlerName);
+ if (fragmentClass == null) {
+ fragmentClass = conf.getClass(
+ String.format("tajo.storage.fragment.%s.class", storeType.name().toLowerCase()), null, Fragment.class);
+ CACHED_FRAGMENT_CLASSES.put(handlerName, fragmentClass);
+ }
+
+ if (fragmentClass == null) {
+ throw new IOException("No such a fragment for " + storeType.name());
+ }
+
+ return fragmentClass;
+ }
+
+ public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto fragment) {
+ T result;
+ try {
+ Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
+ if (constructor == null) {
+ constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS);
+ constructor.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(clazz, constructor);
+ }
+ result = constructor.newInstance(new Object[]{fragment.getContents()});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return result;
+ }
+
+ public static <T extends Fragment> T convert(Configuration conf, StoreType storeType, FragmentProto fragment)
+ throws IOException {
+ Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, storeType);
+ if (fragmentClass == null) {
+ throw new IOException("No such a fragment class for " + storeType.name());
+ }
+ return convert(fragmentClass, fragment);
+ }
+
+ public static <T extends Fragment> List<T> convert(Class<T> clazz, FragmentProto...fragments)
+ throws IOException {
+ List<T> list = Lists.newArrayList();
+ for (FragmentProto proto : fragments) {
+ list.add(convert(clazz, proto));
+ }
+ return list;
+ }
+
+ public static <T extends Fragment> List<T> convert(Configuration conf, StoreType storeType,
+ FragmentProto...fragments) throws IOException {
+ List<T> list = Lists.newArrayList();
+ for (FragmentProto proto : fragments) {
+ list.add((T) convert(conf, storeType, proto));
+ }
+ return list;
+ }
+
+ public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) {
+ List<FragmentProto> list = Lists.newArrayList();
+ for (Fragment fragment : fragments) {
+ list.add(fragment.getProto());
+ }
+ return list;
+ }
+
+ public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) {
+ List<FragmentProto> list = toFragmentProtoList(fragments);
+ return list.toArray(new FragmentProto[list.size()]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
new file mode 100644
index 0000000..74be7ff
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.TupleComparator;
+
+import java.io.IOException;
+
+public interface IndexMethod {
+ IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
+ TupleComparator comparator) throws IOException;
+ IndexReader getIndexReader(final Path fileName, Schema keySchema,
+ TupleComparator comparator) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
new file mode 100644
index 0000000..7baf7aa
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public interface IndexReader {
+
+ /**
+ * Find the offset corresponding to key which is equal to a given key.
+ *
+ * @param key
+ * @return
+ * @throws IOException
+ */
+ public long find(Tuple key) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
new file mode 100644
index 0000000..04738f8
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public abstract class IndexWriter {
+
+ public abstract void write(Tuple key, long offset) throws IOException;
+
+ public abstract void close() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
new file mode 100644
index 0000000..688bbc7
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.storage.index;
+
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public interface OrderIndexReader extends IndexReader {
+ /**
+ * Find the offset corresponding to key which is equal to or greater than
+ * a given key.
+ *
+ * @param key to find
+ * @return
+ * @throws IOException
+ */
+ public long find(Tuple key, boolean nextKey) throws IOException;
+
+ /**
+ * Return the next offset from the latest find or next offset
+ * @return
+ * @throws IOException
+ */
+ public long next() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
new file mode 100644
index 0000000..bc8fe96
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index.bst;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.index.IndexMethod;
+import org.apache.tajo.storage.index.IndexWriter;
+import org.apache.tajo.storage.index.OrderIndexReader;
+import org.apache.tajo.util.Bytes;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
+
+/**
+ * This is two-level binary search tree index. This is one of the value-list
+ * index structure. Thus, it is inefficient in the case where
+ * the many of the values are same. Also, the BST shows the fast performance
+ * when the selectivity of rows to be retrieved is less than 5%.
+ * BSTIndexWriter is not thread-safe, whereas BSTIndexReader is thread-safe.
+ */
+public class BSTIndex implements IndexMethod {
+ private static final Log LOG = LogFactory.getLog(BSTIndex.class);
+
+ public static final int ONE_LEVEL_INDEX = 1;
+ public static final int TWO_LEVEL_INDEX = 2;
+
+ private final Configuration conf;
+
+ public BSTIndex(final Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
+ TupleComparator comparator) throws IOException {
+ return new BSTIndexWriter(fileName, level, keySchema, comparator);
+ }
+
+ @Override
+ public BSTIndexReader getIndexReader(Path fileName, Schema keySchema,
+ TupleComparator comparator) throws IOException {
+ return new BSTIndexReader(fileName, keySchema, comparator);
+ }
+
+ public BSTIndexReader getIndexReader(Path fileName) throws IOException {
+ return new BSTIndexReader(fileName);
+ }
+
+ public class BSTIndexWriter extends IndexWriter implements Closeable {
+ private FSDataOutputStream out;
+ private FileSystem fs;
+ private int level;
+ private int loadNum = 4096;
+ private Path fileName;
+
+ private final Schema keySchema;
+ private final TupleComparator compartor;
+ private final KeyOffsetCollector collector;
+ private KeyOffsetCollector rootCollector;
+
+ private Tuple firstKey;
+ private Tuple lastKey;
+
+
+ // private Tuple lastestKey = null;
+
+ /**
+ * constructor
+ *
+ * @param level
+ * : IndexCreater.ONE_LEVEL_INDEX or IndexCreater.TWO_LEVEL_INDEX
+ * @throws IOException
+ */
+ public BSTIndexWriter(final Path fileName, int level, Schema keySchema,
+ TupleComparator comparator) throws IOException {
+ this.fileName = fileName;
+ this.level = level;
+ this.keySchema = keySchema;
+ this.compartor = comparator;
+ this.collector = new KeyOffsetCollector(comparator);
+ }
+
+ public void setLoadNum(int loadNum) {
+ this.loadNum = loadNum;
+ }
+
+ public void open() throws IOException {
+ fs = fileName.getFileSystem(conf);
+ if (fs.exists(fileName)) {
+ throw new IOException("ERROR: index file (" + fileName + " already exists");
+ }
+ out = fs.create(fileName);
+ }
+
+ @Override
+ public void write(Tuple key, long offset) throws IOException {
+ if (firstKey == null || compartor.compare(key, firstKey) < 0) {
+ firstKey = key;
+ }
+ if (lastKey == null || compartor.compare(lastKey, key) < 0) {
+ lastKey = key;
+ }
+
+ collector.put(key, offset);
+ }
+
+ public TupleComparator getComparator() {
+ return this.compartor;
+ }
+
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ public void writeHeader(int entryNum) throws IOException {
+ // schema
+ byte [] schemaBytes = keySchema.getProto().toByteArray();
+ out.writeInt(schemaBytes.length);
+ out.write(schemaBytes);
+
+ // comparator
+ byte [] comparatorBytes = compartor.getProto().toByteArray();
+ out.writeInt(comparatorBytes.length);
+ out.write(comparatorBytes);
+
+ // level
+ out.writeInt(this.level);
+ // entry
+ out.writeInt(entryNum);
+ if (entryNum > 0) {
+ byte [] minBytes = RowStoreUtil.RowStoreEncoder.toBytes(keySchema,
+ firstKey);
+ out.writeInt(minBytes.length);
+ out.write(minBytes);
+ byte [] maxBytes = RowStoreUtil.RowStoreEncoder.toBytes(keySchema,
+ lastKey);
+ out.writeInt(maxBytes.length);
+ out.write(maxBytes);
+ }
+ out.flush();
+ }
+
+ public void close() throws IOException {
+ /* two level initialize */
+ if (this.level == TWO_LEVEL_INDEX) {
+ rootCollector = new KeyOffsetCollector(this.compartor);
+ }
+
+ /* data writing phase */
+ TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap();
+ Set<Tuple> keySet = keyOffsetMap.keySet();
+
+ int entryNum = keySet.size();
+ writeHeader(entryNum);
+
+ int loadCount = this.loadNum - 1;
+ for (Tuple key : keySet) {
+
+ if (this.level == TWO_LEVEL_INDEX) {
+ loadCount++;
+ if (loadCount == this.loadNum) {
+ rootCollector.put(key, out.getPos());
+ loadCount = 0;
+ }
+ }
+ /* key writing */
+ byte[] buf = RowStoreUtil.RowStoreEncoder.toBytes(this.keySchema, key);
+ out.writeInt(buf.length);
+ out.write(buf);
+
+ /**/
+ LinkedList<Long> offsetList = keyOffsetMap.get(key);
+ /* offset num writing */
+ int offsetSize = offsetList.size();
+ out.writeInt(offsetSize);
+ /* offset writing */
+ for (Long offset : offsetList) {
+ out.writeLong(offset);
+ }
+ }
+
+ out.flush();
+ out.close();
+ keySet.clear();
+ collector.clear();
+
+ FSDataOutputStream rootOut = null;
+ /* root index creating phase */
+ if (this.level == TWO_LEVEL_INDEX) {
+ TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap();
+ keySet = rootMap.keySet();
+
+ rootOut = fs.create(new Path(fileName + ".root"));
+ rootOut.writeInt(this.loadNum);
+ rootOut.writeInt(keySet.size());
+
+ /* root key writing */
+ for (Tuple key : keySet) {
+ byte[] buf = RowStoreUtil.RowStoreEncoder.toBytes(keySchema, key);
+ rootOut.writeInt(buf.length);
+ rootOut.write(buf);
+
+ LinkedList<Long> offsetList = rootMap.get(key);
+ if (offsetList.size() > 1 || offsetList.size() == 0) {
+ throw new IOException("Why root index doen't have one offset?");
+ }
+ rootOut.writeLong(offsetList.getFirst());
+
+ }
+ rootOut.flush();
+ rootOut.close();
+
+ keySet.clear();
+ rootCollector.clear();
+ }
+ }
+
+ private class KeyOffsetCollector {
+ private TreeMap<Tuple, LinkedList<Long>> map;
+
+ public KeyOffsetCollector(TupleComparator comparator) {
+ map = new TreeMap<Tuple, LinkedList<Long>>(comparator);
+ }
+
+ public void put(Tuple key, long offset) {
+ if (map.containsKey(key)) {
+ map.get(key).add(offset);
+ } else {
+ LinkedList<Long> list = new LinkedList<Long>();
+ list.add(offset);
+ map.put(key, list);
+ }
+ }
+
+ public TreeMap<Tuple, LinkedList<Long>> getMap() {
+ return this.map;
+ }
+
+ public void clear() {
+ this.map.clear();
+ }
+ }
+ }
+
+ /**
+ * BSTIndexReader is thread-safe.
+ */
+ public class BSTIndexReader implements OrderIndexReader , Closeable{
+ private Path fileName;
+ private Schema keySchema;
+ private TupleComparator comparator;
+
+ private FileSystem fs;
+ private FSDataInputStream indexIn;
+ private FSDataInputStream subIn;
+
+ private int level;
+ private int entryNum;
+ private int loadNum = -1;
+ private Tuple firstKey;
+ private Tuple lastKey;
+
+ // the cursors of BST
+ private int rootCursor;
+ private int keyCursor;
+ private int offsetCursor;
+
+ // mutex
+ private final Object mutex = new Object();
+
+ /**
+ *
+ * @param fileName
+ * @param keySchema
+ * @param comparator
+ * @throws IOException
+ */
+ public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException {
+ this.fileName = fileName;
+ this.keySchema = keySchema;
+ this.comparator = comparator;
+ }
+
+ public BSTIndexReader(final Path fileName) throws IOException {
+ this.fileName = fileName;
+ }
+
+ public Schema getKeySchema() {
+ return this.keySchema;
+ }
+
+ public TupleComparator getComparator() {
+ return this.comparator;
+ }
+
+ private void readHeader() throws IOException {
+ // schema
+ int schemaByteSize = indexIn.readInt();
+ byte [] schemaBytes = new byte[schemaByteSize];
+ Bytes.readFully(indexIn, schemaBytes, 0, schemaByteSize);
+
+ SchemaProto.Builder builder = SchemaProto.newBuilder();
+ builder.mergeFrom(schemaBytes);
+ SchemaProto proto = builder.build();
+ this.keySchema = new Schema(proto);
+
+ // comparator
+ int compByteSize = indexIn.readInt();
+ byte [] compBytes = new byte[compByteSize];
+ Bytes.readFully(indexIn, compBytes, 0, compByteSize);
+
+ TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder();
+ compProto.mergeFrom(compBytes);
+ this.comparator = new TupleComparator(compProto.build());
+
+ // level
+ this.level = indexIn.readInt();
+ // entry
+ this.entryNum = indexIn.readInt();
+ if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values
+ byte [] minBytes = new byte[indexIn.readInt()];
+ Bytes.readFully(indexIn, minBytes, 0, minBytes.length);
+ this.firstKey = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, minBytes);
+
+ byte [] maxBytes = new byte[indexIn.readInt()];
+ Bytes.readFully(indexIn, maxBytes, 0, maxBytes.length);
+ this.lastKey = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, maxBytes);
+ }
+ }
+
+ public void open()
+ throws IOException {
+ /* init the index file */
+ fs = fileName.getFileSystem(conf);
+ if (!fs.exists(fileName)) {
+ throw new FileNotFoundException("ERROR: does not exist " + fileName.toString());
+ }
+
+ indexIn = fs.open(this.fileName);
+ readHeader();
+ fillData();
+ }
+
+ private void fillData() throws IOException {
+ /* load on memory */
+ if (this.level == TWO_LEVEL_INDEX) {
+
+ Path rootPath = new Path(this.fileName + ".root");
+ if (!fs.exists(rootPath)) {
+ throw new FileNotFoundException("root index did not created");
+ }
+
+ subIn = indexIn;
+ indexIn = fs.open(rootPath);
+ /* root index header reading : type => loadNum => indexSize */
+ this.loadNum = indexIn.readInt();
+ this.entryNum = indexIn.readInt();
+ /**/
+ fillRootIndex(entryNum, indexIn);
+
+ } else {
+ fillLeafIndex(entryNum, indexIn, -1);
+ }
+ }
+
+ /**
+ *
+ * @return
+ * @throws IOException
+ */
+ public long find(Tuple key) throws IOException {
+ return find(key, false);
+ }
+
+ @Override
+ public long find(Tuple key, boolean nextKey) throws IOException {
+ synchronized (mutex) {
+ int pos = -1;
+ switch (this.level) {
+ case ONE_LEVEL_INDEX:
+ pos = oneLevBS(key);
+ break;
+ case TWO_LEVEL_INDEX:
+ pos = twoLevBS(key, this.loadNum + 1);
+ break;
+ }
+
+ if (nextKey) {
+ if (pos + 1 >= this.offsetSubIndex.length) {
+ return -1;
+ }
+ keyCursor = pos + 1;
+ offsetCursor = 0;
+ } else {
+ if (correctable) {
+ keyCursor = pos;
+ offsetCursor = 0;
+ } else {
+ return -1;
+ }
+ }
+
+ return this.offsetSubIndex[keyCursor][offsetCursor];
+ }
+ }
+
+ public long next() throws IOException {
+ synchronized (mutex) {
+ if (offsetSubIndex[keyCursor].length - 1 > offsetCursor) {
+ offsetCursor++;
+ } else {
+ if (offsetSubIndex.length - 1 > keyCursor) {
+ keyCursor++;
+ offsetCursor = 0;
+ } else {
+ if (offsetIndex.length -1 > rootCursor) {
+ rootCursor++;
+ fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]);
+ keyCursor = 1;
+ offsetCursor = 0;
+ } else {
+ return -1;
+ }
+ }
+ }
+
+ return this.offsetSubIndex[keyCursor][offsetCursor];
+ }
+ }
+
+ public boolean isCurInMemory() {
+ return (offsetSubIndex[keyCursor].length - 1 >= offsetCursor);
+ }
+
+ private void fillLeafIndex(int entryNum, FSDataInputStream in, long pos)
+ throws IOException {
+ int counter = 0;
+ try {
+ if (pos != -1) {
+ in.seek(pos);
+ }
+ this.dataSubIndex = new Tuple[entryNum];
+ this.offsetSubIndex = new long[entryNum][];
+
+ byte[] buf;
+
+ for (int i = 0; i < entryNum; i++) {
+ counter++;
+ buf = new byte[in.readInt()];
+ Bytes.readFully(in, buf, 0, buf.length);
+ dataSubIndex[i] = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
+
+ int offsetNum = in.readInt();
+ this.offsetSubIndex[i] = new long[offsetNum];
+ for (int j = 0; j < offsetNum; j++) {
+ this.offsetSubIndex[i][j] = in.readLong();
+ }
+
+ }
+
+ } catch (IOException e) {
+ counter--;
+ if (pos != -1) {
+ in.seek(pos);
+ }
+ this.dataSubIndex = new Tuple[counter];
+ this.offsetSubIndex = new long[counter][];
+
+ byte[] buf;
+ for (int i = 0; i < counter; i++) {
+ buf = new byte[in.readInt()];
+ Bytes.readFully(in, buf, 0, buf.length);
+ dataSubIndex[i] = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
+
+ int offsetNum = in.readInt();
+ this.offsetSubIndex[i] = new long[offsetNum];
+ for (int j = 0; j < offsetNum; j++) {
+ this.offsetSubIndex[i][j] = in.readLong();
+ }
+
+ }
+ }
+ }
+
+ public Tuple getFirstKey() {
+ return this.firstKey;
+ }
+
+ public Tuple getLastKey() {
+ return this.lastKey;
+ }
+
+ private void fillRootIndex(int entryNum, FSDataInputStream in)
+ throws IOException {
+ this.dataIndex = new Tuple[entryNum];
+ this.offsetIndex = new long[entryNum];
+ Tuple keyTuple;
+ byte[] buf;
+ for (int i = 0; i < entryNum; i++) {
+ buf = new byte[in.readInt()];
+ Bytes.readFully(in, buf, 0, buf.length);
+ keyTuple = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
+ dataIndex[i] = keyTuple;
+ this.offsetIndex[i] = in.readLong();
+ }
+ }
+
+ /* memory index, only one is used. */
+ private Tuple[] dataIndex = null;
+ private Tuple[] dataSubIndex = null;
+
+ /* offset index */
+ private long[] offsetIndex = null;
+ private long[][] offsetSubIndex = null;
+
+ private boolean correctable = true;
+
+ private int oneLevBS(Tuple key) throws IOException {
+ correctable = true;
+ int pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
+ return pos;
+ }
+
+ private int twoLevBS(Tuple key, int loadNum) throws IOException {
+ int pos = binarySearch(this.dataIndex, key, 0, this.dataIndex.length);
+ if(pos > 0) {
+ rootCursor = pos;
+ } else {
+ rootCursor = 0;
+ }
+ fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]);
+ pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
+
+ return pos;
+ }
+
+ private int binarySearch(Tuple[] arr, Tuple key, int startPos, int endPos) {
+ int offset = -1;
+ int start = startPos;
+ int end = endPos;
+
+ //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541
+ int centerPos = (start + end) >>> 1;
+ while (true) {
+ if (comparator.compare(arr[centerPos], key) > 0) {
+ if (centerPos == 0) {
+ correctable = false;
+ break;
+ } else if (comparator.compare(arr[centerPos - 1], key) < 0) {
+ correctable = false;
+ offset = centerPos - 1;
+ break;
+ } else {
+ end = centerPos;
+ centerPos = (start + end) / 2;
+ }
+ } else if (comparator.compare(arr[centerPos], key) < 0) {
+ if (centerPos == arr.length - 1) {
+ correctable = false;
+ offset = centerPos;
+ break;
+ } else if (comparator.compare(arr[centerPos + 1], key) > 0) {
+ correctable = false;
+ offset = centerPos;
+ break;
+ } else {
+ start = centerPos + 1;
+ centerPos = (start + end) / 2;
+ }
+ } else {
+ correctable = true;
+ offset = centerPos;
+ break;
+ }
+ }
+ return offset;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.indexIn.close();
+ this.subIn.close();
+ }
+
+ @Override
+ public String toString() {
+ return "BSTIndex (" + firstKey + ", " + lastKey + ") " + fileName;
+ }
+ }
+}