You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 06:59:49 UTC
[15/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
new file mode 100644
index 0000000..00112e7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+@Deprecated
+public class BinarySerializerDeserializer implements SerializerDeserializer {
+
+ static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
+
+ @Override
+ public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
+ throws IOException {
+ byte[] bytes;
+ int length = 0;
+ if (datum == null || datum instanceof NullDatum) {
+ return 0;
+ }
+
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ case BIT:
+ case CHAR:
+ bytes = datum.asByteArray();
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case INT2:
+ length = writeShort(out, datum.asInt2());
+ break;
+ case INT4:
+ length = writeVLong(out, datum.asInt4());
+ break;
+ case INT8:
+ length = writeVLong(out, datum.asInt8());
+ break;
+ case FLOAT4:
+ length = writeFloat(out, datum.asFloat4());
+ break;
+ case FLOAT8:
+ length = writeDouble(out, datum.asFloat8());
+ break;
+ case TEXT: {
+ bytes = datum.asTextBytes();
+ length = datum.size();
+ if (length == 0) {
+ bytes = INVALID_UTF__SINGLE_BYTE;
+ length = INVALID_UTF__SINGLE_BYTE.length;
+ }
+ out.write(bytes, 0, bytes.length);
+ break;
+ }
+ case BLOB:
+ case INET4:
+ case INET6:
+ bytes = datum.asByteArray();
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case PROTOBUF:
+ ProtobufDatum protobufDatum = (ProtobufDatum) datum;
+ bytes = protobufDatum.asByteArray();
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case NULL_TYPE:
+ break;
+ default:
+ throw new IOException("Does not support type");
+ }
+ return length;
+ }
+
+ @Override
+ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+ if (length == 0) return NullDatum.get();
+
+ Datum datum;
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ datum = DatumFactory.createBool(bytes[offset]);
+ break;
+ case BIT:
+ datum = DatumFactory.createBit(bytes[offset]);
+ break;
+ case CHAR: {
+ byte[] chars = new byte[length];
+ System.arraycopy(bytes, offset, chars, 0, length);
+ datum = DatumFactory.createChar(chars);
+ break;
+ }
+ case INT2:
+ datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length));
+ break;
+ case INT4:
+ datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset));
+ break;
+ case INT8:
+ datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset));
+ break;
+ case FLOAT4:
+ datum = DatumFactory.createFloat4(toFloat(bytes, offset, length));
+ break;
+ case FLOAT8:
+ datum = DatumFactory.createFloat8(toDouble(bytes, offset, length));
+ break;
+ case TEXT: {
+ byte[] chars = new byte[length];
+ System.arraycopy(bytes, offset, chars, 0, length);
+
+ if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) {
+ datum = DatumFactory.createText(new byte[0]);
+ } else {
+ datum = DatumFactory.createText(chars);
+ }
+ break;
+ }
+ case PROTOBUF: {
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
+ Message.Builder builder = factory.newBuilder();
+ builder.mergeFrom(bytes, offset, length);
+ datum = factory.createDatum(builder);
+ break;
+ }
+ case INET4:
+ datum = DatumFactory.createInet4(bytes, offset, length);
+ break;
+ case BLOB:
+ datum = DatumFactory.createBlob(bytes, offset, length);
+ break;
+ default:
+ datum = NullDatum.get();
+ }
+ return datum;
+ }
+
+ private byte[] shortBytes = new byte[2];
+
+ public int writeShort(OutputStream out, short val) throws IOException {
+ shortBytes[0] = (byte) (val >> 8);
+ shortBytes[1] = (byte) val;
+ out.write(shortBytes, 0, 2);
+ return 2;
+ }
+
+ public float toFloat(byte[] bytes, int offset, int length) {
+ Preconditions.checkArgument(length == 4);
+
+ int val = ((bytes[offset] & 0x000000FF) << 24) +
+ ((bytes[offset + 1] & 0x000000FF) << 16) +
+ ((bytes[offset + 2] & 0x000000FF) << 8) +
+ (bytes[offset + 3] & 0x000000FF);
+ return Float.intBitsToFloat(val);
+ }
+
+ private byte[] floatBytes = new byte[4];
+
+ public int writeFloat(OutputStream out, float f) throws IOException {
+ int val = Float.floatToIntBits(f);
+
+ floatBytes[0] = (byte) (val >> 24);
+ floatBytes[1] = (byte) (val >> 16);
+ floatBytes[2] = (byte) (val >> 8);
+ floatBytes[3] = (byte) val;
+ out.write(floatBytes, 0, 4);
+ return floatBytes.length;
+ }
+
+ public double toDouble(byte[] bytes, int offset, int length) {
+ Preconditions.checkArgument(length == 8);
+ long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) +
+ ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) +
+ ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) +
+ ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) +
+ ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) +
+ ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) +
+ ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) +
+ (long) (bytes[offset + 7] & 0x00000000000000FF);
+ return Double.longBitsToDouble(val);
+ }
+
+ private byte[] doubleBytes = new byte[8];
+
+ public int writeDouble(OutputStream out, double d) throws IOException {
+ long val = Double.doubleToLongBits(d);
+
+ doubleBytes[0] = (byte) (val >> 56);
+ doubleBytes[1] = (byte) (val >> 48);
+ doubleBytes[2] = (byte) (val >> 40);
+ doubleBytes[3] = (byte) (val >> 32);
+ doubleBytes[4] = (byte) (val >> 24);
+ doubleBytes[5] = (byte) (val >> 16);
+ doubleBytes[6] = (byte) (val >> 8);
+ doubleBytes[7] = (byte) val;
+ out.write(doubleBytes, 0, 8);
+ return doubleBytes.length;
+ }
+
+ private byte[] vLongBytes = new byte[9];
+
+ public static int writeVLongToByteArray(byte[] bytes, int offset, long l) {
+ if (l >= -112 && l <= 127) {
+ bytes[offset] = (byte) l;
+ return 1;
+ }
+
+ int len = -112;
+ if (l < 0) {
+ l ^= -1L; // take one's complement'
+ len = -120;
+ }
+
+ long tmp = l;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
+
+ bytes[offset++] = (byte) len;
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+
+ for (int idx = len; idx != 0; idx--) {
+ int shiftbits = (idx - 1) * 8;
+ bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
+ }
+ return 1 + len;
+ }
+
+ public int writeVLong(OutputStream out, long l) throws IOException {
+ int len = writeVLongToByteArray(vLongBytes, 0, l);
+ out.write(vLongBytes, 0, len);
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
new file mode 100644
index 0000000..85c79fa
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/* this class is PooledBuffer holder */
+public class BufferPool {
+
+ private static final PooledByteBufAllocator allocator;
+
+ private BufferPool() {
+ }
+
+ static {
+ //TODO we need determine the default params
+ allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
+
+ /* if you are finding memory leak, please enable this line */
+ //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
+ }
+
+ public static long maxDirectMemory() {
+ return PlatformDependent.maxDirectMemory();
+ }
+
+
+ public synchronized static ByteBuf directBuffer(int size) {
+ return allocator.directBuffer(size);
+ }
+
+ /**
+ *
+ * @param size the initial capacity
+ * @param max the max capacity
+ * @return allocated ByteBuf from pool
+ */
+ public static ByteBuf directBuffer(int size, int max) {
+ return allocator.directBuffer(size, max);
+ }
+
+ @InterfaceStability.Unstable
+ public static void forceRelease(ByteBuf buf) {
+ buf.release(buf.refCnt());
+ }
+
+ /**
+ * the ByteBuf will increase to writable size
+ * @param buf
+ * @param minWritableBytes required minimum writable size
+ */
+ public static void ensureWritable(ByteBuf buf, int minWritableBytes) {
+ buf.ensureWritable(minWritableBytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
new file mode 100644
index 0000000..45fb1d8
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.channels.spi.AbstractInterruptibleChannel;
+
+public class ByteBufInputChannel extends AbstractInterruptibleChannel implements ScatteringByteChannel {
+
+ ByteBufferReadable byteBufferReadable;
+ ReadableByteChannel channel;
+ InputStream inputStream;
+
+ public ByteBufInputChannel(InputStream inputStream) {
+ if (inputStream instanceof DFSInputStream && inputStream instanceof ByteBufferReadable) {
+ this.byteBufferReadable = (ByteBufferReadable) inputStream;
+ } else {
+ this.channel = Channels.newChannel(inputStream);
+ }
+
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public long read(ByteBuffer[] dsts, int offset, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long read(ByteBuffer[] dsts) {
+ return read(dsts, 0, dsts.length);
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ if (byteBufferReadable != null) {
+ return byteBufferReadable.read(dst);
+ } else {
+ return channel.read(dst);
+ }
+ }
+
+ @Override
+ protected void implCloseChannel() throws IOException {
+ IOUtils.cleanup(null, channel, inputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java
new file mode 100644
index 0000000..8841a31
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+public class DataLocation {
+ private String host;
+ private int volumeId;
+
+ public DataLocation(String host, int volumeId) {
+ this.host = host;
+ this.volumeId = volumeId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getVolumeId() {
+ return volumeId;
+ }
+
+ @Override
+ public String toString() {
+ return "DataLocation{" +
+ "host=" + host +
+ ", volumeId=" + volumeId +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
new file mode 100644
index 0000000..2396349
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DiskDeviceInfo {
+ private int id;
+ private String name;
+
+ private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
+
+ public DiskDeviceInfo(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return id + "," + name;
+ }
+
+ public void addMountPath(DiskMountInfo diskMountInfo) {
+ mountInfos.add(diskMountInfo);
+ }
+
+ public List<DiskMountInfo> getMountInfos() {
+ return mountInfos;
+ }
+
+ public void setMountInfos(List<DiskMountInfo> mountInfos) {
+ this.mountInfos = mountInfos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java
new file mode 100644
index 0000000..22f18ba
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+public class DiskInfo {
+ private int id;
+ private String partitionName;
+ private String mountPath;
+
+ private long capacity;
+ private long used;
+
+ public DiskInfo(int id, String partitionName) {
+ this.id = id;
+ this.partitionName = partitionName;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public String getPartitionName() {
+ return partitionName;
+ }
+
+ public void setPartitionName(String partitionName) {
+ this.partitionName = partitionName;
+ }
+
+ public String getMountPath() {
+ return mountPath;
+ }
+
+ public void setMountPath(String mountPath) {
+ this.mountPath = mountPath;
+ }
+
+ public long getCapacity() {
+ return capacity;
+ }
+
+ public void setCapacity(long capacity) {
+ this.capacity = capacity;
+ }
+
+ public long getUsed() {
+ return used;
+ }
+
+ public void setUsed(long used) {
+ this.used = used;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
new file mode 100644
index 0000000..aadb0e7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Objects;
+
+public class DiskMountInfo implements Comparable<DiskMountInfo> {
+ private String mountPath;
+
+ private long capacity;
+ private long used;
+
+ private int deviceId;
+
+ public DiskMountInfo(int deviceId, String mountPath) {
+ this.mountPath = mountPath;
+ }
+
+ public String getMountPath() {
+ return mountPath;
+ }
+
+ public void setMountPath(String mountPath) {
+ this.mountPath = mountPath;
+ }
+
+ public long getCapacity() {
+ return capacity;
+ }
+
+ public void setCapacity(long capacity) {
+ this.capacity = capacity;
+ }
+
+ public long getUsed() {
+ return used;
+ }
+
+ public void setUsed(long used) {
+ this.used = used;
+ }
+
+ public int getDeviceId() {
+ return deviceId;
+ }
+
+ @Override
+ public boolean equals(Object obj){
+ if (!(obj instanceof DiskMountInfo)) return false;
+
+ if (compareTo((DiskMountInfo) obj) == 0) return true;
+ else return false;
+ }
+
+ @Override
+ public int hashCode(){
+ return Objects.hashCode(mountPath);
+ }
+
+ @Override
+ public int compareTo(DiskMountInfo other) {
+ String path1 = mountPath;
+ String path2 = other.mountPath;
+
+ int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
+ int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
+
+ if(path1Depth > path2Depth) {
+ return -1;
+ } else if(path1Depth < path2Depth) {
+ return 1;
+ } else {
+ int path1Length = path1.length();
+ int path2Length = path2.length();
+
+ if(path1Length < path2Length) {
+ return 1;
+ } else if(path1Length > path2Length) {
+ return -1;
+ } else {
+ return path1.compareTo(path2);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
new file mode 100644
index 0000000..2d68870
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.common.Util;
+
+import java.io.*;
+import java.net.URI;
+import java.util.*;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+
+public class DiskUtil {
+
+ static String UNIX_DISK_DEVICE_PATH = "/proc/partitions";
+
+ public enum OSType {
+ OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
+ }
+
+ static private OSType getOSType() {
+ String osName = System.getProperty("os.name");
+ if (osName.contains("Windows")
+ && (osName.contains("XP") || osName.contains("2003")
+ || osName.contains("Vista")
+ || osName.contains("Windows_7")
+ || osName.contains("Windows 7") || osName
+ .contains("Windows7"))) {
+ return OSType.OS_TYPE_WINXP;
+ } else if (osName.contains("SunOS") || osName.contains("Solaris")) {
+ return OSType.OS_TYPE_SOLARIS;
+ } else if (osName.contains("Mac")) {
+ return OSType.OS_TYPE_MAC;
+ } else {
+ return OSType.OS_TYPE_UNIX;
+ }
+ }
+
+ public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
+ List<DiskDeviceInfo> deviceInfos;
+
+ if(getOSType() == OSType.OS_TYPE_UNIX) {
+ deviceInfos = getUnixDiskDeviceInfos();
+ setDeviceMountInfo(deviceInfos);
+ } else {
+ deviceInfos = getDefaultDiskDeviceInfos();
+ }
+
+ return deviceInfos;
+ }
+
+ private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
+ List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
+
+ File file = new File(UNIX_DISK_DEVICE_PATH);
+ if(!file.exists()) {
+ System.out.println("No partition file:" + file.getAbsolutePath());
+ return getDefaultDiskDeviceInfos();
+ }
+
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH)));
+ String line = null;
+
+ int count = 0;
+ Set<String> deviceNames = new TreeSet<String>();
+ while((line = reader.readLine()) != null) {
+ if(count > 0 && !line.trim().isEmpty()) {
+ String[] tokens = line.trim().split(" +");
+ if(tokens.length == 4) {
+ String deviceName = getDiskDeviceName(tokens[3]);
+ deviceNames.add(deviceName);
+ }
+ }
+ count++;
+ }
+
+ int id = 0;
+ for(String eachDeviceName: deviceNames) {
+ DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
+ diskDeviceInfo.setName(eachDeviceName);
+
+ //TODO set addtional info
+ // /sys/block/sda/queue
+ infos.add(diskDeviceInfo);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if(reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ return infos;
+ }
+
+ private static String getDiskDeviceName(String partitionName) {
+ byte[] bytes = partitionName.getBytes();
+
+ byte[] result = new byte[bytes.length];
+ int length = 0;
+ for(int i = 0; i < bytes.length; i++, length++) {
+ if(bytes[i] >= '0' && bytes[i] <= '9') {
+ break;
+ } else {
+ result[i] = bytes[i];
+ }
+ }
+
+ return new String(result, 0, length);
+ }
+
+ public static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
+ DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
+ diskDeviceInfo.setName("default");
+
+ List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
+
+ infos.add(diskDeviceInfo);
+
+ return infos;
+ }
+
+
+ private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
+ Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
+ for(DiskDeviceInfo eachDevice: deviceInfos) {
+ deviceMap.put(eachDevice.getName(), eachDevice);
+ }
+
+ BufferedReader mountOutput = null;
+ try {
+ Process mountProcess = Runtime.getRuntime().exec("mount");
+ mountOutput = new BufferedReader(new InputStreamReader(
+ mountProcess.getInputStream()));
+ while (true) {
+ String line = mountOutput.readLine();
+ if (line == null) {
+ break;
+ }
+
+ int indexStart = line.indexOf(" on /");
+ int indexEnd = line.indexOf(" ", indexStart + 4);
+
+ String deviceName = line.substring(0, indexStart).trim();
+ String[] deviceNameTokens = deviceName.split("/");
+ if(deviceNameTokens.length == 3) {
+ if("dev".equals(deviceNameTokens[1])) {
+ String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
+ String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
+
+ DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
+ if(diskDeviceInfo != null) {
+ diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ if (mountOutput != null) {
+ mountOutput.close();
+ }
+ }
+ }
+
+ public static int getDataNodeStorageSize(){
+ return getStorageDirs().size();
+ }
+
+ public static List<URI> getStorageDirs(){
+ Configuration conf = new HdfsConfiguration();
+ Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
+ return Util.stringCollectionAsURIs(dirNames);
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.out.println("/dev/sde1".split("/").length);
+ for(String eachToken: "/dev/sde1".split("/")) {
+ System.out.println(eachToken);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
new file mode 100644
index 0000000..8b7e2e0
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.exception.UnsupportedException;
+
+/**
+ * An instance of FrameTuple is an immutable tuple.
+ * It contains two tuples and pretends to be one instance of Tuple for
+ * join qual evaluatations.
+ */
+public class FrameTuple implements Tuple, Cloneable {
+ private int size;
+ private int leftSize;
+
+ private Tuple left;
+ private Tuple right;
+
+ public FrameTuple() {}
+
+ public FrameTuple(Tuple left, Tuple right) {
+ set(left, right);
+ }
+
+ public void set(Tuple left, Tuple right) {
+ this.size = left.size() + right.size();
+ this.left = left;
+ this.leftSize = left.size();
+ this.right = right;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public boolean contains(int fieldId) {
+ Preconditions.checkArgument(fieldId < size,
+ "Out of field access: " + fieldId);
+
+ if (fieldId < leftSize) {
+ return left.contains(fieldId);
+ } else {
+ return right.contains(fieldId - leftSize);
+ }
+ }
+
+ @Override
+ public boolean isNull(int fieldid) {
+ return get(fieldid).isNull();
+ }
+
+ @Override
+ public boolean isNotNull(int fieldid) {
+ return !isNull(fieldid);
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void put(int fieldId, Datum value) {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void put(int fieldId, Datum[] values) {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void setOffset(long offset) {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public long getOffset() {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public void put(Datum [] values) {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public Datum get(int fieldId) {
+ Preconditions.checkArgument(fieldId < size,
+ "Out of field access: " + fieldId);
+
+ if (fieldId < leftSize) {
+ return left.get(fieldId);
+ } else {
+ return right.get(fieldId - leftSize);
+ }
+ }
+
+ @Override
+ public boolean getBool(int fieldId) {
+ return get(fieldId).asBool();
+ }
+
+ @Override
+ public byte getByte(int fieldId) {
+ return get(fieldId).asByte();
+ }
+
+ @Override
+ public char getChar(int fieldId) {
+ return get(fieldId).asChar();
+ }
+
+ @Override
+ public byte [] getBytes(int fieldId) {
+ return get(fieldId).asByteArray();
+ }
+
+ @Override
+ public short getInt2(int fieldId) {
+ return get(fieldId).asInt2();
+ }
+
+ @Override
+ public int getInt4(int fieldId) {
+ return get(fieldId).asInt4();
+ }
+
+ @Override
+ public long getInt8(int fieldId) {
+ return get(fieldId).asInt8();
+ }
+
+ @Override
+ public float getFloat4(int fieldId) {
+ return get(fieldId).asFloat4();
+ }
+
+ @Override
+ public double getFloat8(int fieldId) {
+ return get(fieldId).asFloat8();
+ }
+
+ @Override
+ public String getText(int fieldId) {
+ return get(fieldId).asChars();
+ }
+
+ @Override
+ public ProtobufDatum getProtobufDatum(int fieldId) {
+ return (ProtobufDatum) get(fieldId);
+ }
+
+ @Override
+ public IntervalDatum getInterval(int fieldId) {
+ return (IntervalDatum) get(fieldId);
+ }
+
+ @Override
+ public char [] getUnicodeChars(int fieldId) {
+ return get(fieldId).asUnicodeChars();
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ FrameTuple frameTuple = (FrameTuple) super.clone();
+ frameTuple.set(this.left.clone(), this.right.clone());
+ return frameTuple;
+ }
+
+ @Override
+ public Datum[] getValues(){
+ throw new UnsupportedException();
+ }
+
+ public String toString() {
+ boolean first = true;
+ StringBuilder str = new StringBuilder();
+ str.append("(");
+ for(int i=0; i < size(); i++) {
+ if(contains(i)) {
+ if(first) {
+ first = false;
+ } else {
+ str.append(", ");
+ }
+ str.append(i)
+ .append("=>")
+ .append(get(i));
+ }
+ }
+ str.append(")");
+ return str.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
new file mode 100644
index 0000000..bfbe478
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.exception.UnsupportedException;
+
+import java.util.Arrays;
+
+public class LazyTuple implements Tuple, Cloneable {
+ private long offset;
+ private Datum[] values;
+ private byte[][] textBytes;
+ private Schema schema;
+ private byte[] nullBytes;
+ private SerializerDeserializer serializeDeserialize;
+
+ public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
+ this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
+ }
+
+ public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
+ this.schema = schema;
+ this.textBytes = textBytes;
+ this.values = new Datum[schema.size()];
+ this.offset = offset;
+ this.nullBytes = nullBytes;
+ this.serializeDeserialize = serde;
+ }
+
+ public LazyTuple(LazyTuple tuple) {
+ this.values = tuple.getValues();
+ this.offset = tuple.offset;
+ this.schema = tuple.schema;
+ this.textBytes = new byte[size()][];
+ this.nullBytes = tuple.nullBytes;
+ this.serializeDeserialize = tuple.serializeDeserialize;
+ }
+
+ @Override
+ public int size() {
+ return values.length;
+ }
+
+ @Override
+ public boolean contains(int fieldid) {
+ return textBytes[fieldid] != null || values[fieldid] != null;
+ }
+
+ @Override
+ public boolean isNull(int fieldid) {
+ return get(fieldid).isNull();
+ }
+
+ @Override
+ public boolean isNotNull(int fieldid) {
+ return !isNull(fieldid);
+ }
+
+ @Override
+ public void clear() {
+ for (int i = 0; i < values.length; i++) {
+ values[i] = null;
+ textBytes[i] = null;
+ }
+ }
+
+ //////////////////////////////////////////////////////
+ // Setter
+ //////////////////////////////////////////////////////
+ @Override
+ public void put(int fieldId, Datum value) {
+ values[fieldId] = value;
+ textBytes[fieldId] = null;
+ }
+
+ @Override
+ public void put(int fieldId, Datum[] values) {
+ for (int i = fieldId, j = 0; j < values.length; i++, j++) {
+ this.values[i] = values[j];
+ }
+ this.textBytes = new byte[values.length][];
+ }
+
+ @Override
+ public void put(int fieldId, Tuple tuple) {
+ for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
+ values[i] = tuple.get(j);
+ textBytes[i] = null;
+ }
+ }
+
+ @Override
+ public void put(Datum[] values) {
+ System.arraycopy(values, 0, this.values, 0, size());
+ this.textBytes = new byte[values.length][];
+ }
+
+ //////////////////////////////////////////////////////
+ // Getter
+ //////////////////////////////////////////////////////
+ @Override
+ public Datum get(int fieldId) {
+ if (values[fieldId] != null)
+ return values[fieldId];
+ else if (textBytes.length <= fieldId) {
+ values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,")
+ } else if (textBytes[fieldId] != null) {
+ try {
+ values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
+ textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
+ } catch (Exception e) {
+ values[fieldId] = NullDatum.get();
+ }
+ textBytes[fieldId] = null;
+ } else {
+ //non-projection
+ }
+ return values[fieldId];
+ }
+
+ @Override
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ @Override
+ public long getOffset() {
+ return this.offset;
+ }
+
+ @Override
+ public boolean getBool(int fieldId) {
+ return get(fieldId).asBool();
+ }
+
+ @Override
+ public byte getByte(int fieldId) {
+ return get(fieldId).asByte();
+ }
+
+ @Override
+ public char getChar(int fieldId) {
+ return get(fieldId).asChar();
+ }
+
+ @Override
+ public byte [] getBytes(int fieldId) {
+ return get(fieldId).asByteArray();
+ }
+
+ @Override
+ public short getInt2(int fieldId) {
+ return get(fieldId).asInt2();
+ }
+
+ @Override
+ public int getInt4(int fieldId) {
+ return get(fieldId).asInt4();
+ }
+
+ @Override
+ public long getInt8(int fieldId) {
+ return get(fieldId).asInt8();
+ }
+
+ @Override
+ public float getFloat4(int fieldId) {
+ return get(fieldId).asFloat4();
+ }
+
+ @Override
+ public double getFloat8(int fieldId) {
+ return get(fieldId).asFloat8();
+ }
+
+ @Override
+ public String getText(int fieldId) {
+ return get(fieldId).asChars();
+ }
+
+ @Override
+ public ProtobufDatum getProtobufDatum(int fieldId) {
+ throw new UnsupportedException();
+ }
+
+ @Override
+ public IntervalDatum getInterval(int fieldId) {
+ return (IntervalDatum) get(fieldId);
+ }
+
+ @Override
+ public char[] getUnicodeChars(int fieldId) {
+ return get(fieldId).asUnicodeChars();
+ }
+
+ public String toString() {
+ boolean first = true;
+ StringBuilder str = new StringBuilder();
+ str.append("(");
+ Datum d;
+ for (int i = 0; i < values.length; i++) {
+ d = get(i);
+ if (d != null) {
+ if (first) {
+ first = false;
+ } else {
+ str.append(", ");
+ }
+ str.append(i)
+ .append("=>")
+ .append(d);
+ }
+ }
+ str.append(")");
+ return str.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(values);
+ }
+
+ @Override
+ public Datum[] getValues() {
+ Datum[] datums = new Datum[values.length];
+ for (int i = 0; i < values.length; i++) {
+ datums[i] = get(i);
+ }
+ return datums;
+ }
+
+ @Override
+ public Tuple clone() throws CloneNotSupportedException {
+ LazyTuple lazyTuple = (LazyTuple) super.clone();
+
+ lazyTuple.values = getValues(); //shallow copy
+ lazyTuple.textBytes = new byte[size()][];
+ return lazyTuple;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Tuple) {
+ Tuple other = (Tuple) obj;
+ return Arrays.equals(getValues(), other.getValues());
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
new file mode 100644
index 0000000..f19b61f
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.datum.*;
+import org.apache.tajo.util.ClassSize;
+
+public class MemoryUtil {
+
+ /** Overhead for an NullDatum */
+ public static final long NULL_DATUM;
+
+ /** Overhead for an BoolDatum */
+ public static final long BOOL_DATUM;
+
+ /** Overhead for an CharDatum */
+ public static final long CHAR_DATUM;
+
+ /** Overhead for an BitDatum */
+ public static final long BIT_DATUM;
+
+ /** Overhead for an Int2Datum */
+ public static final long INT2_DATUM;
+
+ /** Overhead for an Int4Datum */
+ public static final long INT4_DATUM;
+
+ /** Overhead for an Int8Datum */
+ public static final long INT8_DATUM;
+
+ /** Overhead for an Float4Datum */
+ public static final long FLOAT4_DATUM;
+
+ /** Overhead for an Float8Datum */
+ public static final long FLOAT8_DATUM;
+
+ /** Overhead for an TextDatum */
+ public static final long TEXT_DATUM;
+
+ /** Overhead for an BlobDatum */
+ public static final long BLOB_DATUM;
+
+ /** Overhead for an DateDatum */
+ public static final long DATE_DATUM;
+
+ /** Overhead for an TimeDatum */
+ public static final long TIME_DATUM;
+
+ /** Overhead for an TimestampDatum */
+ public static final long TIMESTAMP_DATUM;
+
+ static {
+ NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false);
+
+ CHAR_DATUM = ClassSize.estimateBase(CharDatum.class, false);
+
+ BOOL_DATUM = ClassSize.estimateBase(BooleanDatum.class, false);
+
+ BIT_DATUM = ClassSize.estimateBase(BitDatum.class, false);
+
+ INT2_DATUM = ClassSize.estimateBase(Int2Datum.class, false);
+
+ INT4_DATUM = ClassSize.estimateBase(Int4Datum.class, false);
+
+ INT8_DATUM = ClassSize.estimateBase(Int8Datum.class, false);
+
+ FLOAT4_DATUM = ClassSize.estimateBase(Float4Datum.class, false);
+
+ FLOAT8_DATUM = ClassSize.estimateBase(Float8Datum.class, false);
+
+ TEXT_DATUM = ClassSize.estimateBase(TextDatum.class, false);
+
+ BLOB_DATUM = ClassSize.estimateBase(BlobDatum.class, false);
+
+ DATE_DATUM = ClassSize.estimateBase(DateDatum.class, false);
+
+ TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false);
+
+ TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false);
+ }
+
+ public static long calculateMemorySize(Tuple tuple) {
+ long total = ClassSize.OBJECT;
+ for (Datum datum : tuple.getValues()) {
+ switch (datum.type()) {
+
+ case NULL_TYPE:
+ total += NULL_DATUM;
+ break;
+
+ case BOOLEAN:
+ total += BOOL_DATUM;
+ break;
+
+ case BIT:
+ total += BIT_DATUM;
+ break;
+
+ case CHAR:
+ total += CHAR_DATUM + datum.size();
+ break;
+
+ case INT1:
+ case INT2:
+ total += INT2_DATUM;
+ break;
+
+ case INT4:
+ total += INT4_DATUM;
+ break;
+
+ case INT8:
+ total += INT8_DATUM;
+ break;
+
+ case FLOAT4:
+ total += FLOAT4_DATUM;
+ break;
+
+ case FLOAT8:
+ total += FLOAT4_DATUM;
+ break;
+
+ case TEXT:
+ total += TEXT_DATUM + datum.size();
+ break;
+
+ case DATE:
+ total += DATE_DATUM;
+ break;
+
+ case TIME:
+ total += TIME_DATUM;
+ break;
+
+ case TIMESTAMP:
+ total += TIMESTAMP_DATUM;
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ return total;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
new file mode 100644
index 0000000..66b3667
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class MergeScanner implements Scanner {
+ private Configuration conf;
+ private TableMeta meta;
+ private Schema schema;
+ private List<Fragment> fragments;
+ private Iterator<Fragment> iterator;
+ private Fragment currentFragment;
+ private Scanner currentScanner;
+ private Tuple tuple;
+ private boolean projectable = false;
+ private boolean selectable = false;
+ private Schema target;
+ private float progress;
+ protected TableStats tableStats;
+
+ public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<Fragment> rawFragmentList)
+ throws IOException {
+ this(conf, schema, meta, rawFragmentList, schema);
+ }
+
+ public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<Fragment> rawFragmentList,
+ Schema target)
+ throws IOException {
+ this.conf = conf;
+ this.schema = schema;
+ this.meta = meta;
+ this.target = target;
+
+ this.fragments = new ArrayList<Fragment>();
+
+ long numBytes = 0;
+ for (Fragment eachFileFragment: rawFragmentList) {
+ long fragmentLength = StorageManager.getFragmentLength((TajoConf)conf, eachFileFragment);
+ if (fragmentLength > 0) {
+ numBytes += fragmentLength;
+ fragments.add(eachFileFragment);
+ }
+ }
+
+ // it should keep the input order. Otherwise, it causes wrong result of sort queries.
+ this.reset();
+
+ if (currentScanner != null) {
+ this.projectable = currentScanner.isProjectable();
+ this.selectable = currentScanner.isSelectable();
+ }
+
+ tableStats = new TableStats();
+
+ tableStats.setNumBytes(numBytes);
+ tableStats.setNumBlocks(fragments.size());
+
+ for(Column eachColumn: schema.getColumns()) {
+ ColumnStats columnStats = new ColumnStats(eachColumn);
+ tableStats.addColumnStat(columnStats);
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ progress = 0.0f;
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if (currentScanner != null)
+ tuple = currentScanner.next();
+
+ if (tuple != null) {
+ return tuple;
+ } else {
+ if (currentScanner != null) {
+ currentScanner.close();
+ TableStats scannerTableStsts = currentScanner.getInputStats();
+ if (scannerTableStsts != null) {
+ tableStats.setReadBytes(tableStats.getReadBytes() + scannerTableStsts.getReadBytes());
+ tableStats.setNumRows(tableStats.getNumRows() + scannerTableStsts.getNumRows());
+ }
+ }
+ currentScanner = getNextScanner();
+ if (currentScanner != null) {
+ tuple = currentScanner.next();
+ }
+ }
+ return tuple;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ this.iterator = fragments.iterator();
+ if (currentScanner != null) {
+ currentScanner.close();
+ }
+ this.currentScanner = getNextScanner();
+ }
+
+ private Scanner getNextScanner() throws IOException {
+ if (iterator.hasNext()) {
+ currentFragment = iterator.next();
+ currentScanner = StorageManager.getStorageManager((TajoConf)conf, meta.getStoreType()).getScanner(meta, schema,
+ currentFragment, target);
+ currentScanner.init();
+ return currentScanner;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(currentScanner != null) {
+ currentScanner.close();
+ currentScanner = null;
+ }
+ iterator = null;
+ progress = 1.0f;
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return projectable;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ this.target = new Schema(targets);
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return selectable;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return false;
+ }
+
+ @Override
+ public float getProgress() {
+ if (currentScanner != null && iterator != null && tableStats.getNumBytes() > 0) {
+ TableStats scannerTableStsts = currentScanner.getInputStats();
+ long currentScannerReadBytes = 0;
+ if (scannerTableStsts != null) {
+ currentScannerReadBytes = scannerTableStsts.getReadBytes();
+ }
+
+ return (float)(tableStats.getReadBytes() + currentScannerReadBytes) / (float)tableStats.getNumBytes();
+ } else {
+ return progress;
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return tableStats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
new file mode 100644
index 0000000..4272228
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java
@@ -0,0 +1,109 @@
+package org.apache.tajo.storage; /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+
+public class NullScanner implements Scanner {
+ protected final Configuration conf;
+ protected final TableMeta meta;
+ protected final Schema schema;
+ protected final Fragment fragment;
+ protected final int columnNum;
+ protected Column [] targets;
+ protected float progress;
+ protected TableStats tableStats;
+
+ public NullScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) {
+ this.conf = conf;
+ this.meta = meta;
+ this.schema = schema;
+ this.fragment = fragment;
+ this.tableStats = new TableStats();
+ this.columnNum = this.schema.size();
+ }
+
+ @Override
+ public void init() throws IOException {
+ progress = 0.0f;
+ tableStats.setNumBytes(0);
+ tableStats.setNumBlocks(0);
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ progress = 1.0f;
+ return null;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ progress = 0.0f;
+ }
+
+ @Override
+ public void close() throws IOException {
+ progress = 1.0f;
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return false;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ this.targets = targets;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return true;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return true;
+ }
+
+ @Override
+ public float getProgress() {
+ return progress;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return tableStats;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
new file mode 100644
index 0000000..94d13ee
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Comparator;
+
+public class NumericPathComparator implements Comparator<Path> {
+
+ @Override
+ public int compare(Path p1, Path p2) {
+ int num1 = Integer.parseInt(p1.getName());
+ int num2 = Integer.parseInt(p2.getName());
+
+ return num1 - num2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
new file mode 100644
index 0000000..24b6280
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.exception.UnknownDataTypeException;
+import org.apache.tajo.tuple.offheap.RowWriter;
+import org.apache.tajo.util.BitArray;
+
+import java.nio.ByteBuffer;
+
+public class RowStoreUtil {
+ public static int[] getTargetIds(Schema inSchema, Schema outSchema) {
+ int[] targetIds = new int[outSchema.size()];
+ int i = 0;
+ for (Column target : outSchema.getColumns()) {
+ targetIds[i] = inSchema.getColumnId(target.getQualifiedName());
+ i++;
+ }
+
+ return targetIds;
+ }
+
+ public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
+ out.clear();
+ for (int idx = 0; idx < targetIds.length; idx++) {
+ out.put(idx, in.get(targetIds[idx]));
+ }
+ return out;
+ }
+
+ public static RowStoreEncoder createEncoder(Schema schema) {
+ return new RowStoreEncoder(schema);
+ }
+
+ public static RowStoreDecoder createDecoder(Schema schema) {
+ return new RowStoreDecoder(schema);
+ }
+
+ public static class RowStoreDecoder {
+
+ private Schema schema;
+ private BitArray nullFlags;
+ private int headerSize;
+
+ private RowStoreDecoder(Schema schema) {
+ this.schema = schema;
+ nullFlags = new BitArray(schema.size());
+ headerSize = nullFlags.bytesLength();
+ }
+
+
+ public Tuple toTuple(byte [] bytes) {
+ nullFlags.clear();
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ Tuple tuple = new VTuple(schema.size());
+ Column col;
+ TajoDataTypes.DataType type;
+
+ bb.limit(headerSize);
+ nullFlags.fromByteBuffer(bb);
+ bb.limit(bytes.length);
+
+ for (int i =0; i < schema.size(); i++) {
+ if (nullFlags.get(i)) {
+ tuple.put(i, DatumFactory.createNullDatum());
+ continue;
+ }
+
+ col = schema.getColumn(i);
+ type = col.getDataType();
+ switch (type.getType()) {
+ case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break;
+ case BIT:
+ byte b = bb.get();
+ tuple.put(i, DatumFactory.createBit(b));
+ break;
+
+ case CHAR:
+ byte c = bb.get();
+ tuple.put(i, DatumFactory.createChar(c));
+ break;
+
+ case INT2:
+ short s = bb.getShort();
+ tuple.put(i, DatumFactory.createInt2(s));
+ break;
+
+ case INT4:
+ case DATE:
+ int i_ = bb.getInt();
+ tuple.put(i, DatumFactory.createFromInt4(type, i_));
+ break;
+
+ case INT8:
+ case TIME:
+ case TIMESTAMP:
+ long l = bb.getLong();
+ tuple.put(i, DatumFactory.createFromInt8(type, l));
+ break;
+
+ case INTERVAL:
+ int month = bb.getInt();
+ long milliseconds = bb.getLong();
+ tuple.put(i, new IntervalDatum(month, milliseconds));
+ break;
+
+ case FLOAT4:
+ float f = bb.getFloat();
+ tuple.put(i, DatumFactory.createFloat4(f));
+ break;
+
+ case FLOAT8:
+ double d = bb.getDouble();
+ tuple.put(i, DatumFactory.createFloat8(d));
+ break;
+
+ case TEXT:
+ byte [] _string = new byte[bb.getInt()];
+ bb.get(_string);
+ tuple.put(i, DatumFactory.createText(_string));
+ break;
+
+ case BLOB:
+ byte [] _bytes = new byte[bb.getInt()];
+ bb.get(_bytes);
+ tuple.put(i, DatumFactory.createBlob(_bytes));
+ break;
+
+ case INET4:
+ byte [] _ipv4 = new byte[4];
+ bb.get(_ipv4);
+ tuple.put(i, DatumFactory.createInet4(_ipv4));
+ break;
+ case INET6:
+ // TODO - to be implemented
+ throw new UnsupportedException(type.getType().name());
+ default:
+ throw new RuntimeException(new UnknownDataTypeException(type.getType().name()));
+ }
+ }
+ return tuple;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+ }
+
+ public static class RowStoreEncoder {
+ private Schema schema;
+ private BitArray nullFlags;
+ private int headerSize;
+
+ private RowStoreEncoder(Schema schema) {
+ this.schema = schema;
+ nullFlags = new BitArray(schema.size());
+ headerSize = nullFlags.bytesLength();
+ }
+
+ public byte[] toBytes(Tuple tuple) {
+ nullFlags.clear();
+ int size = estimateTupleDataSize(tuple);
+ ByteBuffer bb = ByteBuffer.allocate(size + headerSize);
+ bb.position(headerSize);
+ Column col;
+ for (int i = 0; i < schema.size(); i++) {
+ if (tuple.isNull(i)) {
+ nullFlags.set(i);
+ continue;
+ }
+
+ col = schema.getColumn(i);
+ switch (col.getDataType().getType()) {
+ case NULL_TYPE:
+ nullFlags.set(i);
+ break;
+ case BOOLEAN:
+ bb.put(tuple.get(i).asByte());
+ break;
+ case BIT:
+ bb.put(tuple.get(i).asByte());
+ break;
+ case CHAR:
+ bb.put(tuple.get(i).asByte());
+ break;
+ case INT2:
+ bb.putShort(tuple.get(i).asInt2());
+ break;
+ case INT4:
+ bb.putInt(tuple.get(i).asInt4());
+ break;
+ case INT8:
+ bb.putLong(tuple.get(i).asInt8());
+ break;
+ case FLOAT4:
+ bb.putFloat(tuple.get(i).asFloat4());
+ break;
+ case FLOAT8:
+ bb.putDouble(tuple.get(i).asFloat8());
+ break;
+ case TEXT:
+ byte[] _string = tuple.get(i).asByteArray();
+ bb.putInt(_string.length);
+ bb.put(_string);
+ break;
+ case DATE:
+ bb.putInt(tuple.get(i).asInt4());
+ break;
+ case TIME:
+ case TIMESTAMP:
+ bb.putLong(tuple.get(i).asInt8());
+ break;
+ case INTERVAL:
+ IntervalDatum interval = (IntervalDatum) tuple.get(i);
+ bb.putInt(interval.getMonths());
+ bb.putLong(interval.getMilliSeconds());
+ break;
+ case BLOB:
+ byte[] bytes = tuple.get(i).asByteArray();
+ bb.putInt(bytes.length);
+ bb.put(bytes);
+ break;
+ case INET4:
+ byte[] ipBytes = tuple.get(i).asByteArray();
+ bb.put(ipBytes);
+ break;
+ case INET6:
+ bb.put(tuple.get(i).asByteArray());
+ break;
+ default:
+ throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
+ }
+ }
+
+ byte[] flags = nullFlags.toArray();
+ int finalPosition = bb.position();
+ bb.position(0);
+ bb.put(flags);
+
+ bb.position(finalPosition);
+ bb.flip();
+ byte[] buf = new byte[bb.limit()];
+ bb.get(buf);
+ return buf;
+ }
+
+ // Note that, NULL values are treated separately
+ private int estimateTupleDataSize(Tuple tuple) {
+ int size = 0;
+ Column col;
+
+ for (int i = 0; i < schema.size(); i++) {
+ if (tuple.isNull(i)) {
+ continue;
+ }
+
+ col = schema.getColumn(i);
+ switch (col.getDataType().getType()) {
+ case BOOLEAN:
+ case BIT:
+ case CHAR:
+ size += 1;
+ break;
+ case INT2:
+ size += 2;
+ break;
+ case DATE:
+ case INT4:
+ case FLOAT4:
+ size += 4;
+ break;
+ case TIME:
+ case TIMESTAMP:
+ case INT8:
+ case FLOAT8:
+ size += 8;
+ break;
+ case INTERVAL:
+ size += 12;
+ break;
+ case TEXT:
+ case BLOB:
+ size += (4 + tuple.get(i).asByteArray().length);
+ break;
+ case INET4:
+ case INET6:
+ size += tuple.get(i).asByteArray().length;
+ break;
+ default:
+ throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
+ }
+ }
+
+ size += 100; // optimistic reservation
+
+ return size;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+ }
+
+ public static void convert(Tuple tuple, RowWriter writer) {
+ writer.startRow();
+
+ for (int i = 0; i < writer.dataTypes().length; i++) {
+ if (tuple.isNull(i)) {
+ writer.skipField();
+ continue;
+ }
+ switch (writer.dataTypes()[i].getType()) {
+ case BOOLEAN:
+ writer.putBool(tuple.getBool(i));
+ break;
+ case INT1:
+ case INT2:
+ writer.putInt2(tuple.getInt2(i));
+ break;
+ case INT4:
+ case DATE:
+ case INET4:
+ writer.putInt4(tuple.getInt4(i));
+ break;
+ case INT8:
+ case TIMESTAMP:
+ case TIME:
+ writer.putInt8(tuple.getInt8(i));
+ break;
+ case FLOAT4:
+ writer.putFloat4(tuple.getFloat4(i));
+ break;
+ case FLOAT8:
+ writer.putFloat8(tuple.getFloat8(i));
+ break;
+ case TEXT:
+ writer.putText(tuple.getBytes(i));
+ break;
+ case INTERVAL:
+ writer.putInterval((IntervalDatum) tuple.getInterval(i));
+ break;
+ case PROTOBUF:
+ writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i));
+ break;
+ case NULL_TYPE:
+ writer.skipField();
+ break;
+ default:
+ throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]);
+ }
+ }
+ writer.endRow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
new file mode 100644
index 0000000..0356b19
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SchemaObject;
+import org.apache.tajo.catalog.statistics.TableStats;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Scanner Interface
+ */
+public interface Scanner extends SchemaObject, Closeable {
+
+ void init() throws IOException;
+
+ /**
+ * It returns one tuple at each call.
+ *
+ * @return retrieve null if the scanner has no more tuples.
+ * Otherwise it returns one tuple.
+ *
+ * @throws java.io.IOException if internal I/O error occurs during next method
+ */
+ Tuple next() throws IOException;
+
+ /**
+ * Reset the cursor. After executed, the scanner
+ * will retrieve the first tuple.
+ *
+ * @throws java.io.IOException if internal I/O error occurs during reset method
+ */
+ void reset() throws IOException;
+
+ /**
+ * Close scanner
+ *
+ * @throws java.io.IOException if internal I/O error occurs during close method
+ */
+ void close() throws IOException;
+
+
+ /**
+ * It returns if the projection is executed in the underlying scanner layer.
+ *
+ * @return true if this scanner can project the given columns.
+ */
+ boolean isProjectable();
+
+ /**
+ * Set target columns
+ * @param targets columns to be projected
+ */
+ void setTarget(Column[] targets);
+
+ /**
+ * It returns if the selection is executed in the underlying scanner layer.
+ *
+ * @return true if this scanner can filter tuples against a given condition.
+ */
+ boolean isSelectable();
+
+ /**
+ * Set a search condition
+ * @param expr to be searched
+ *
+ * TODO - to be changed Object type
+ */
+ void setSearchCondition(Object expr);
+
+ /**
+ * It returns if the file is splittable.
+ *
+ * @return true if this scanner can split the a file.
+ */
+ boolean isSplittable();
+
+ /**
+ * How much of the input has the Scanner consumed
+ * @return progress from <code>0.0</code> to <code>1.0</code>.
+ */
+ float getProgress();
+
+ TableStats getInputStats();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java
new file mode 100644
index 0000000..894e7ee
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import java.io.IOException;
+
+public interface SeekableScanner extends Scanner {
+
+ public abstract long getNextOffset() throws IOException;
+
+ public abstract void seek(long offset) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
new file mode 100644
index 0000000..564a9f5
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+@Deprecated
+public interface SerializerDeserializer {
+
+ public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException;
+
+ public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
+
+}