You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:39 UTC
[02/18] TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java
new file mode 100644
index 0000000..12b984e
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java
@@ -0,0 +1,513 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+
+import java.io.*;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ScheduledInputStream extends InputStream implements Seekable, Closeable, DataInput {
+ private static final Log LOG = LogFactory.getLog(ScheduledInputStream.class);
+
+ private FSDataInputStream originStream;
+
+ private int currentScanIndex;
+
+ private Queue<ScanData> dataQueue = new LinkedList<ScanData>();
+
+ private ScanData currentScanData;
+
+ private AtomicBoolean closed = new AtomicBoolean(false);
+
+ private boolean eof = false;
+
+ private long pos;
+
+ private AtomicInteger avaliableSize = new AtomicInteger(0);
+
+ private long fileLen;
+
+ private long startOffset;
+
+ private long length;
+
+ private long endOffset;
+
+ private boolean endOfStream = false;
+
+ private Path file;
+
+ private byte readLongBuffer[] = new byte[8];
+
+ private AtomicLong totalReadBytesForFetch = new AtomicLong(0);
+
+ private AtomicLong totalReadBytesFromDisk = new AtomicLong(0);
+
+ public ScheduledInputStream(Path file, FSDataInputStream originStream,
+ long startOffset, long length, long fileLen) throws IOException {
+ this.originStream = originStream;
+ this.startOffset = startOffset;
+ this.length = length;
+ this.endOffset = startOffset + length;
+ this.fileLen = fileLen;
+ this.file = file;
+ this.pos = this.originStream.getPos();
+
+ LOG.info("Open:" + toString());
+ }
+
+ public int getAvaliableSize() {
+ return avaliableSize.get();
+ }
+
+ public String toString() {
+ return file.getName() + ":" + startOffset + ":" + length;
+ }
+ public boolean readNext(int length) throws IOException {
+ return readNext(length, false);
+ }
+
+ public boolean readNext(int length, boolean ignoreEOS) throws IOException {
+ synchronized(dataQueue) {
+ if(closed.get() || (!ignoreEOS && endOfStream)) {
+ return false;
+ }
+ int bufLength = ignoreEOS ? length : (int)Math.min(length, endOffset - originStream.getPos());
+ bufLength = (int)Math.min(bufLength, fileLen - originStream.getPos());
+ if(bufLength == 0) {
+ return false;
+ }
+ byte[] buf = new byte[bufLength];
+
+ try {
+ originStream.readFully(buf);
+ } catch (EOFException e) {
+ LOG.error(e.getMessage(), e);
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e.getMessage(), e);
+ }
+
+ if(originStream.getPos() == fileLen) {
+ LOG.info("EOF:" + toString());
+ eof = true;
+ }
+ if(!ignoreEOS && originStream.getPos() >= endOffset) {
+ LOG.info("EndOfStream:" + toString());
+ endOfStream = true;
+ }
+
+ if(currentScanData == null) {
+ currentScanData = new ScanData(buf, bufLength);
+ currentScanIndex = 0;
+ } else {
+ dataQueue.offer(new ScanData(buf, bufLength));
+ }
+
+ avaliableSize.addAndGet(bufLength);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Add DataQueue: queue=" + dataQueue.size() +
+ ", avaliable Size=" + avaliableSize.get() + ", pos=" + getPos() +
+ ", originPos=" + originStream.getPos() + ",endOfStream=" + endOfStream +
+ ", bufLength=" + bufLength + ",ignoreEOS=" + ignoreEOS);
+ }
+
+ totalReadBytesFromDisk.addAndGet(bufLength);
+ dataQueue.notifyAll();
+ }
+ return !eof;
+ }
+
+ static class ScanData {
+ byte[] data;
+ int length;
+ public ScanData(byte[] buf, int length) {
+ this.data = buf;
+ this.length = length;
+ }
+
+ @Override
+ public String toString() {
+ return "length=" + length;
+ }
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ synchronized(dataQueue) {
+ dataQueue.clear();
+ currentScanData = null;
+ currentScanIndex = 0;
+ avaliableSize.set(0);
+ originStream.seek(pos);
+ this.pos = pos;
+ }
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return this.pos;
+ }
+
+ public long getOriginStreamPos() {
+ try {
+ return this.originStream.getPos();
+ } catch (IOException e) {
+ e.printStackTrace();
+ return 0;
+ }
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ synchronized(dataQueue) {
+ dataQueue.clear();
+ currentScanData = null;
+ currentScanIndex = 0;
+ avaliableSize.set(0);
+ boolean result = originStream.seekToNewSource(targetPos);
+
+ this.pos = originStream.getPos();
+ return result;
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ if(noMoreData()) {
+ return -1;
+ }
+ if(currentScanData == null || currentScanIndex >= currentScanData.length) {
+ synchronized(dataQueue) {
+ if(dataQueue.isEmpty()) {
+ if(endOfStream) {
+ readNext(64 * 1024, true);
+ } else {
+ try {
+ dataQueue.wait();
+ if(eof && dataQueue.isEmpty() && currentScanIndex > 0) {
+ //no more data
+ return -1;
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ if(!dataQueue.isEmpty() && currentScanIndex > 0) {
+ currentScanData = dataQueue.poll();
+ currentScanIndex = 0;
+ }
+ }
+ }
+
+ this.pos++;
+ avaliableSize.decrementAndGet();
+ totalReadBytesForFetch.incrementAndGet();
+
+ return currentScanData.data[currentScanIndex++] & 0xff;
+ }
+
+ private boolean noMoreData() {
+ return closed.get();
+ }
+
+ public int read(byte b[], int off, int len) throws IOException {
+ if(noMoreData()) {
+ return -1;
+ }
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+ if(currentScanData == null) {
+ synchronized(dataQueue) {
+ if(dataQueue.isEmpty()) {
+ if(endOfStream) {
+ readNext(64 * 1024, true);
+ } else {
+ try {
+ dataQueue.wait();
+ if(noMoreData()) {
+ return -1;
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ if(!dataQueue.isEmpty() && currentScanIndex > 0) {
+ currentScanData = dataQueue.poll();
+ currentScanIndex = 0;
+ }
+ }
+ }
+
+ int numRemainBytes = currentScanData.length - currentScanIndex;
+ if(numRemainBytes > len) {
+ System.arraycopy(currentScanData.data, currentScanIndex, b, off, len);
+ currentScanIndex += len;
+ avaliableSize.addAndGet(0 - len);
+ pos += len;
+
+ totalReadBytesForFetch.addAndGet(len);
+ return len;
+ } else {
+ int offset = off;
+ int length = 0;
+ int numCopyBytes = numRemainBytes;
+ while(true) {
+ synchronized(dataQueue) {
+ if(numCopyBytes == 0 && eof && dataQueue.isEmpty()) {
+ return -1;
+ }
+ }
+ System.arraycopy(currentScanData.data, currentScanIndex, b, offset, numCopyBytes);
+ currentScanIndex += numCopyBytes;
+ offset += numCopyBytes;
+ length += numCopyBytes;
+ if(length >= len) {
+ break;
+ }
+ synchronized(dataQueue) {
+ if(dataQueue.isEmpty()) {
+ if(eof) {
+ break;
+ }
+ if(endOfStream) {
+ readNext(64 * 1024, true);
+ } else {
+ try {
+ dataQueue.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ if(eof && dataQueue.isEmpty()) {
+ break;
+ }
+ if(!dataQueue.isEmpty() && currentScanIndex > 0) {
+ currentScanData = dataQueue.poll();
+ currentScanIndex = 0;
+ }
+ if(currentScanData == null) {
+ break;
+ }
+ }
+ if(currentScanData.length >= (len - length)) {
+ numCopyBytes = (len - length);
+ } else {
+ numCopyBytes = currentScanData.length;
+ }
+ } //end of while
+ this.pos += length;
+ avaliableSize.addAndGet(0 - length);
+
+ totalReadBytesForFetch.addAndGet(length);
+ return length;
+ }
+ }
+
+ public long getTotalReadBytesForFetch() {
+ return totalReadBytesForFetch.get();
+ }
+
+ public long getTotalReadBytesFromDisk() {
+ return totalReadBytesFromDisk.get();
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOG.info("Close:" + toString());
+ synchronized(dataQueue) {
+ if(closed.get()) {
+ return;
+ }
+ closed.set(true);
+ originStream.close();
+ dataQueue.clear();
+ currentScanIndex = 0;
+ super.close();
+ }
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ readFully(b, 0, b.length);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ if (len < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+ int n = 0;
+ while (n < len) {
+ int count = read(b, off + n, len - n);
+ if (count < 0) {
+ throw new EOFException();
+ }
+ n += count;
+ }
+ }
+
+ @Override
+ public int skipBytes(int bytes) throws IOException {
+ int skipTotal = 0;
+ int currentPos = 0;
+
+ while ((skipTotal<bytes) && ((currentPos = (int)skip(bytes-skipTotal)) > 0)) {
+ skipTotal += currentPos;
+ }
+
+ return skipTotal;
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ int val = read();
+ if (val < 0) {
+ throw new EOFException();
+ }
+ return (val != 0);
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ int val = read();
+ if (val < 0) {
+ throw new EOFException();
+ }
+ return (byte)(val);
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ int val = read();
+ if (val < 0) {
+ throw new EOFException();
+ }
+ return val;
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ int val1 = read();
+ int val2 = read();
+ if ((val1 | val2) < 0) {
+ throw new EOFException();
+ }
+ return (short)((val1 << 8) + (val2 << 0));
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ int val1 = read();
+ int val2 = read();
+ if ((val1 | val2) < 0) {
+ throw new EOFException();
+ }
+ return (val1 << 8) + (val2 << 0);
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ int val1 = read();
+ int val2 = read();
+ if ((val1 | val2) < 0) {
+ throw new EOFException();
+ }
+ return (char)((val1 << 8) + (val2 << 0));
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ int val1 = read();
+ int val2 = read();
+ int val3 = read();
+ int val4 = read();
+ if ((val1 | val2 | val3 | val4) < 0) {
+ throw new EOFException();
+ }
+ return ((val1 << 24) + (val2 << 16) + (val3 << 8) + (val4 << 0));
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ readFully(readLongBuffer, 0, 8);
+ return (((long) readLongBuffer[0] << 56) +
+ ((long)(readLongBuffer[1] & 255) << 48) +
+ ((long)(readLongBuffer[2] & 255) << 40) +
+ ((long)(readLongBuffer[3] & 255) << 32) +
+ ((long)(readLongBuffer[4] & 255) << 24) +
+ ((readLongBuffer[5] & 255) << 16) +
+ ((readLongBuffer[6] & 255) << 8) +
+ ((readLongBuffer[7] & 255) << 0));
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ throw new IOException("Unsupported operation: readLine");
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ throw new IOException("Unsupported operation: readUTF");
+ }
+
+ public boolean isEOF() {
+ return eof;
+ }
+
+ public boolean isEndOfStream() {
+ return endOfStream;
+ }
+
+ public void reset() {
+ synchronized(dataQueue) {
+ endOfStream = false;
+ eof = false;
+ closed.set(false);
+ dataQueue.clear();
+ currentScanIndex = 0;
+ currentScanData = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
new file mode 100644
index 0000000..cffff00
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+public final class StorageManagerV2 extends AbstractStorageManager {
+ private final Log LOG = LogFactory.getLog(StorageManagerV2.class);
+
+ private Queue<FileScannerV2> scanQueue = new LinkedList<FileScannerV2>();
+
+ private Object scanQueueLock = new Object();
+
+ private Object scanDataLock = new Object();
+
+ private ScanScheduler scanScheduler;
+
+ private StorgaeManagerContext context;
+
+ public StorageManagerV2(TajoConf conf) throws IOException {
+ super(conf);
+ context = new StorgaeManagerContext();
+ scanScheduler = new ScanScheduler(context);
+ scanScheduler.start();
+ LOG.info("StorageManager v2 started...");
+ }
+
+ @Override
+ public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException {
+ Class<? extends Scanner> scannerClass;
+
+ String handlerName = storeType.name().toLowerCase();
+ String handlerNameKey = handlerName + "_v2";
+
+ scannerClass = SCANNER_HANDLER_CACHE.get(handlerNameKey);
+ if (scannerClass == null) {
+ scannerClass = conf.getClass(String.format("tajo.storage.scanner-handler.v2.%s.class",
+ storeType.name().toLowerCase()), null, Scanner.class);
+ SCANNER_HANDLER_CACHE.put(handlerNameKey, scannerClass);
+ }
+
+ return scannerClass;
+ }
+
+ @Override
+ public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+ Scanner scanner;
+
+ Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
+ if (scannerClass == null) {
+ throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+ }
+
+ scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
+ if (scanner.isProjectable()) {
+ scanner.setTarget(target.toArray());
+ }
+
+ if(scanner instanceof FileScannerV2) {
+ ((FileScannerV2)scanner).setStorageManagerContext(context);
+ }
+ return scanner;
+ }
+
+ public void requestFileScan(FileScannerV2 fileScanner) {
+ synchronized(scanQueueLock) {
+ scanQueue.offer(fileScanner);
+
+ scanQueueLock.notifyAll();
+ }
+ }
+
+ public StorgaeManagerContext getContext() {
+ return context;
+ }
+
+ public class StorgaeManagerContext {
+ public Object getScanQueueLock() {
+ return scanQueueLock;
+ }
+
+ public Object getScanDataLock() {
+ return scanDataLock;
+ }
+
+ public Queue<FileScannerV2> getScanQueue() {
+ return scanQueue;
+ }
+
+ public int getMaxReadBytesPerScheduleSlot() {
+ return conf.getIntVar(TajoConf.ConfVars.STORAGE_MANAGER_DISK_SCHEDULER_MAX_READ_BYTES_PER_SLOT);
+ }
+
+ public void requestFileScan(FileScannerV2 fileScanner) {
+ StorageManagerV2.this.requestFileScan(fileScanner);
+ }
+
+ public TajoConf getConf() {
+ return conf;
+ }
+
+ public void incrementReadBytes(int diskId, long[] readBytes) {
+ scanScheduler.incrementReadBytes(diskId, readBytes);
+ }
+ }
+
+ public void stop() {
+ if(scanScheduler != null) {
+ scanScheduler.stopScheduler();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/proto/IndexProtos.proto b/tajo-storage/src/main/proto/IndexProtos.proto
new file mode 100644
index 0000000..bcb0cbe
--- /dev/null
+++ b/tajo-storage/src/main/proto/IndexProtos.proto
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.index";
+option java_outer_classname = "IndexProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message TupleComparatorProto {
+ repeated TupleComparatorSpecProto compSpecs = 1;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
new file mode 100644
index 0000000..5bf4453
--- /dev/null
+++ b/tajo-storage/src/main/resources/storage-default.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<configuration>
+ <property>
+ <name>tajo.storage.manager.v2</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.manager.maxReadBytes</name>
+ <value>8388608</value>
+ <description></description>
+ </property>
+
+ <property>
+ <name>tajo.storage.manager.concurrency.perDisk</name>
+ <value>1</value>
+ <description></description>
+ </property>
+
+ <!--- Registered Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler</name>
+ <value>csv,raw,rcfile,row,trevni</value>
+ </property>
+
+ <!--- Fragment Class Configurations -->
+ <property>
+ <name>tajo.storage.fragment.csv.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.raw.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.rcfile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.row.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.trevni.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+
+ <!--- Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.v2.csv.class</name>
+ <value>org.apache.tajo.storage.v2.CSVFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.v2.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.v2.rcfile.class</name>
+ <value>org.apache.tajo.storage.v2.RCFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.v2.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.trevni.class</name>
+ <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.v2.trevni.class</name>
+ <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+ </property>
+
+ <!--- Appender Handler -->
+ <property>
+ <name>tajo.storage.appender-handler</name>
+ <value>csv,raw,rcfile,row,trevni</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.trevni.class</name>
+ <value>org.apache.tajo.storage.trevni.TrevniAppender</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
new file mode 100644
index 0000000..bec1556
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestCompressionStorages {
+ private TajoConf conf;
+ private static String TEST_PATH = "target/test-data/TestCompressionStorages";
+
+ private StoreType storeType;
+ private Path testDir;
+ private FileSystem fs;
+
+ public TestCompressionStorages(StoreType type) throws IOException {
+ this.storeType = type;
+ conf = new TajoConf();
+
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][]{
+ {StoreType.CSV},
+ {StoreType.RCFILE}
+ });
+ }
+
+ @Test
+ public void testDeflateCodecCompressionData() throws IOException {
+ storageCompressionTest(storeType, DeflateCodec.class);
+ }
+
+ @Test
+ public void testGzipCodecCompressionData() throws IOException {
+ if (storeType == StoreType.RCFILE) {
+ if( ZlibFactory.isNativeZlibLoaded(conf)) {
+ storageCompressionTest(storeType, GzipCodec.class);
+ }
+ } else {
+ storageCompressionTest(storeType, GzipCodec.class);
+ }
+ }
+
+ @Test
+ public void testSnappyCodecCompressionData() throws IOException {
+ if (SnappyCodec.isNativeCodeLoaded()) {
+ storageCompressionTest(storeType, SnappyCodec.class);
+ }
+ }
+
+ @Test
+ public void testBzip2CodecCompressionData() throws IOException {
+ storageCompressionTest(storeType, BZip2Codec.class);
+ }
+
+ @Test
+ public void testLz4CodecCompressionData() throws IOException {
+ if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
+ storageCompressionTest(storeType, Lz4Codec.class);
+ }
+
+ @Test
+ public void testSplitCompressionData() throws IOException {
+ if(StoreType.CSV != storeType) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
+
+ Path tablePath = new Path(testDir, "SplitCompression");
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ String extention = "";
+ if (appender instanceof CSVFile.CSVAppender) {
+ extention = ((CSVFile.CSVAppender) appender).getExtension();
+ }
+
+ int tupleNum = 100000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(2);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+ tablePath = tablePath.suffix(extention);
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ long randomNum = (long) (Math.random() * fileLen) + 1;
+
+ FileFragment[] tablets = new FileFragment[2];
+ tablets[0] = new FileFragment("SplitCompression", tablePath, 0, randomNum);
+ tablets[1] = new FileFragment("SplitCompression", tablePath, randomNum, (fileLen - randomNum));
+
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ int tupleCnt = 0;
+ Tuple tuple;
+ while ((tuple = scanner.next()) != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ while ((tuple = scanner.next()) != null) {
+ tupleCnt++;
+ }
+
+ scanner.close();
+ assertEquals(tupleNum, tupleCnt);
+ }
+
+ private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.FLOAT4);
+ schema.addColumn("name", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ meta.putOption("compression.codec", codec.getCanonicalName());
+ meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
+
+ String fileName = "Compression_" + codec.getSimpleName();
+ Path tablePath = new Path(testDir, fileName);
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.enableStats();
+
+ appender.init();
+
+ String extension = "";
+ if (appender instanceof CSVFile.CSVAppender) {
+ extension = ((CSVFile.CSVAppender) appender).getExtension();
+ }
+
+ int tupleNum = 100000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(3);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createFloat4((float) i));
+ vTuple.put(2, DatumFactory.createText(String.valueOf(i)));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+ tablePath = tablePath.suffix(extension);
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ FileFragment[] tablets = new FileFragment[1];
+ tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
+
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+
+ if (StoreType.CSV == storeType) {
+ if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
+ assertTrue(scanner.isSplittable());
+ } else {
+ assertFalse(scanner.isSplittable());
+ }
+ }
+ scanner.init();
+ int tupleCnt = 0;
+ Tuple tuple;
+ while ((tuple = scanner.next()) != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+ assertEquals(tupleNum, tupleCnt);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
new file mode 100644
index 0000000..387fed5
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestFrameTuple {
+ private Tuple tuple1;
+ private Tuple tuple2;
+
+ @Before
+ public void setUp() throws Exception {
+ tuple1 = new VTuple(11);
+ tuple1.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar('9'),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("hyunsik"),
+ DatumFactory.createBlob("hyunsik".getBytes()),
+ DatumFactory.createInet4("192.168.0.1")
+ });
+
+ tuple2 = new VTuple(11);
+ tuple2.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar('9'),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("hyunsik"),
+ DatumFactory.createBlob("hyunsik".getBytes()),
+ DatumFactory.createInet4("192.168.0.1")
+ });
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testFrameTuple() {
+ Tuple frame = new FrameTuple(tuple1, tuple2);
+ assertEquals(22, frame.size());
+ for (int i = 0; i < 22; i++) {
+ assertTrue(frame.contains(i));
+ }
+
+ assertEquals(DatumFactory.createInt8(23l), frame.get(5));
+ assertEquals(DatumFactory.createInt8(23l), frame.get(16));
+ assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
+ assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
new file mode 100644
index 0000000..e1430e1
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestLazyTuple {
+
+ Schema schema;
+ byte[][] textRow;
+ byte[] nullbytes;
+ SerializerDeserializer serde;
+
+ @Before
+ public void setUp() {
+ nullbytes = "\\N".getBytes();
+
+ schema = new Schema();
+ schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
+ schema.addColumn("col2", TajoDataTypes.Type.BIT);
+ schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7);
+ schema.addColumn("col4", TajoDataTypes.Type.INT2);
+ schema.addColumn("col5", TajoDataTypes.Type.INT4);
+ schema.addColumn("col6", TajoDataTypes.Type.INT8);
+ schema.addColumn("col7", TajoDataTypes.Type.FLOAT4);
+ schema.addColumn("col8", TajoDataTypes.Type.FLOAT8);
+ schema.addColumn("col9", TajoDataTypes.Type.TEXT);
+ schema.addColumn("col10", TajoDataTypes.Type.BLOB);
+ schema.addColumn("col11", TajoDataTypes.Type.INET4);
+ schema.addColumn("col12", TajoDataTypes.Type.INT4);
+ schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(DatumFactory.createBool(true)).append('|');
+ sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|');
+ sb.append(DatumFactory.createChar("str")).append('|');
+ sb.append(DatumFactory.createInt2((short) 17)).append('|');
+ sb.append(DatumFactory.createInt4(59)).append('|');
+ sb.append(DatumFactory.createInt8(23l)).append('|');
+ sb.append(DatumFactory.createFloat4(77.9f)).append('|');
+ sb.append(DatumFactory.createFloat8(271.9f)).append('|');
+ sb.append(DatumFactory.createText("str2")).append('|');
+ sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
+ sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
+ sb.append(new String(nullbytes)).append('|');
+ sb.append(NullDatum.get());
+ textRow = Bytes.splitPreserveAllTokens(sb.toString().getBytes(), '|');
+ serde = new TextSerializerDeserializer();
+ }
+
+ @Test
+ public void testGetDatum() {
+
+ LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
+ assertEquals(DatumFactory.createBool(true), t1.get(0));
+ assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
+ assertEquals(DatumFactory.createChar("str"), t1.get(2));
+ assertEquals(DatumFactory.createInt2((short) 17), t1.get(3));
+ assertEquals(DatumFactory.createInt4(59), t1.get(4));
+ assertEquals(DatumFactory.createInt8(23l), t1.get(5));
+ assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6));
+ assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7));
+ assertEquals(DatumFactory.createText("str2"), t1.get(8));
+ assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9));
+ assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10));
+ assertEquals(NullDatum.get(), t1.get(11));
+ assertEquals(NullDatum.get(), t1.get(12));
+ }
+
+ @Test
+ public void testContain() {
+ int colNum = schema.getColumnNum();
+
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(3, DatumFactory.createInt4(1));
+ t1.put(7, DatumFactory.createInt4(1));
+
+ assertTrue(t1.contains(0));
+ assertFalse(t1.contains(1));
+ assertFalse(t1.contains(2));
+ assertTrue(t1.contains(3));
+ assertFalse(t1.contains(4));
+ assertFalse(t1.contains(5));
+ assertFalse(t1.contains(6));
+ assertTrue(t1.contains(7));
+ assertFalse(t1.contains(8));
+ assertFalse(t1.contains(9));
+ assertFalse(t1.contains(10));
+ assertFalse(t1.contains(11));
+ assertFalse(t1.contains(12));
+ }
+
+ @Test
+ public void testPut() {
+ int colNum = schema.getColumnNum();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ t1.put(0, DatumFactory.createText("str"));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(11, DatumFactory.createFloat4(0.76f));
+
+ assertTrue(t1.contains(0));
+ assertTrue(t1.contains(1));
+
+ assertEquals(t1.getString(0).toString(), "str");
+ assertEquals(t1.getInt(1).asInt4(), 2);
+ assertTrue(t1.getFloat(11).asFloat4() == 0.76f);
+ }
+
+ @Test
+ public void testEquals() {
+ int colNum = schema.getColumnNum();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+
+ assertEquals(t1, t2);
+
+ Tuple t3 = new VTuple(colNum);
+ t3.put(0, DatumFactory.createInt4(1));
+ t3.put(1, DatumFactory.createInt4(2));
+ t3.put(3, DatumFactory.createInt4(2));
+ assertEquals(t1, t3);
+ assertEquals(t2, t3);
+
+ LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1);
+ assertNotSame(t1, t4);
+ }
+
+ @Test
+ public void testHashCode() {
+ int colNum = schema.getColumnNum();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+ LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("str"));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+ t2.put(4, DatumFactory.createText("str"));
+
+ assertEquals(t1.hashCode(), t2.hashCode());
+
+ Tuple t3 = new VTuple(colNum);
+ t3.put(0, DatumFactory.createInt4(1));
+ t3.put(1, DatumFactory.createInt4(2));
+ t3.put(3, DatumFactory.createInt4(2));
+ t3.put(4, DatumFactory.createText("str"));
+ assertEquals(t1.hashCode(), t3.hashCode());
+ assertEquals(t2.hashCode(), t3.hashCode());
+
+ Tuple t4 = new VTuple(5);
+ t4.put(0, DatumFactory.createInt4(1));
+ t4.put(1, DatumFactory.createInt4(2));
+ t4.put(4, DatumFactory.createInt4(2));
+
+ assertNotSame(t1.hashCode(), t4.hashCode());
+ }
+
+ @Test
+ public void testPutTuple() {
+ int colNum = schema.getColumnNum();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(2, DatumFactory.createInt4(3));
+
+
+ Schema schema2 = new Schema();
+ schema2.addColumn("col1", TajoDataTypes.Type.INT8);
+ schema2.addColumn("col2", TajoDataTypes.Type.INT8);
+
+ LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.getColumnNum()][], -1);
+ t2.put(0, DatumFactory.createInt4(4));
+ t2.put(1, DatumFactory.createInt4(5));
+
+ t1.put(3, t2);
+
+ for (int i = 0; i < 5; i++) {
+ assertEquals(i + 1, t1.get(i).asInt4());
+ }
+ }
+
+ @Test
+ public void testInvalidNumber() {
+ byte[][] bytes = Bytes.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|');
+ Schema schema = new Schema();
+ schema.addColumn("col1", TajoDataTypes.Type.INT2);
+ schema.addColumn("col2", TajoDataTypes.Type.INT4);
+ schema.addColumn("col3", TajoDataTypes.Type.INT8);
+ schema.addColumn("col4", TajoDataTypes.Type.FLOAT4);
+ schema.addColumn("col5", TajoDataTypes.Type.FLOAT8);
+
+ LazyTuple tuple = new LazyTuple(schema, bytes, 0);
+ assertEquals(bytes.length, tuple.size());
+
+ for (int i = 0; i < tuple.size(); i++){
+ assertEquals(NullDatum.get(), tuple.get(i));
+ }
+ }
+
+ @Test
+ public void testClone() throws CloneNotSupportedException {
+ int colNum = schema.getColumnNum();
+ LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("str"));
+
+ LazyTuple t2 = (LazyTuple) t1.clone();
+ assertNotSame(t1, t2);
+ assertEquals(t1, t2);
+
+ assertSame(t1.get(4), t2.get(4));
+
+ t1.clear();
+ assertFalse(t1.equals(t2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
new file mode 100644
index 0000000..f2a66d9
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestMergeScanner {
+ private TajoConf conf;
+ AbstractStorageManager sm;
+ private static String TEST_PATH = "target/test-data/TestMergeScanner";
+ private Path testDir;
+ private StoreType storeType;
+ private FileSystem fs;
+
+ public TestMergeScanner(StoreType storeType) {
+ this.storeType = storeType;
+ }
+
+ @Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][] {
+ {StoreType.CSV},
+ {StoreType.RAW},
+ {StoreType.RCFILE},
+ {StoreType.TREVNI},
+ // RowFile requires Byte-buffer read support, so we omitted RowFile.
+ //{StoreType.ROWFILE},
+
+ });
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = new TajoConf();
+ conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
+ conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "trevni");
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+ }
+
+ @Test
+ public void testMultipleFiles() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("file", Type.TEXT);
+ schema.addColumn("name", Type.TEXT);
+ schema.addColumn("age", Type.INT8);
+
+ Options options = new Options();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+
+ Path table1Path = new Path(testDir, storeType + "_1.data");
+ Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, table1Path);
+ appender1.enableStats();
+ appender1.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for(int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(4);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createText("hyunsik"));
+ vTuple.put(2, DatumFactory.createText("jihoon"));
+ vTuple.put(3, DatumFactory.createInt8(25l));
+ appender1.addTuple(vTuple);
+ }
+ appender1.close();
+
+ TableStats stat1 = appender1.getStats();
+ if (stat1 != null) {
+ assertEquals(tupleNum, stat1.getNumRows().longValue());
+ }
+
+ Path table2Path = new Path(testDir, storeType + "_2.data");
+ Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, table2Path);
+ appender2.enableStats();
+ appender2.init();
+
+ for(int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(4);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createText("hyunsik"));
+ vTuple.put(2, DatumFactory.createText("jihoon"));
+ vTuple.put(3, DatumFactory.createInt8(25l));
+ appender2.addTuple(vTuple);
+ }
+ appender2.close();
+
+ TableStats stat2 = appender2.getStats();
+ if (stat2 != null) {
+ assertEquals(tupleNum, stat2.getNumRows().longValue());
+ }
+
+
+ FileStatus status1 = fs.getFileStatus(table1Path);
+ FileStatus status2 = fs.getFileStatus(table2Path);
+ FileFragment[] fragment = new FileFragment[2];
+ fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen());
+ fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen());
+
+ Schema targetSchema = new Schema();
+ targetSchema.addColumn(schema.getColumn(0));
+ targetSchema.addColumn(schema.getColumn(2));
+
+ Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.<FileFragment>newList(fragment), targetSchema);
+ assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable());
+
+ scanner.init();
+ int totalCounts = 0;
+ Tuple tuple;
+ while ((tuple=scanner.next()) != null) {
+ totalCounts++;
+ if (isProjectableStorage(meta.getStoreType())) {
+ assertNotNull(tuple.get(0));
+ assertNull(tuple.get(1));
+ assertNotNull(tuple.get(2));
+ assertNull(tuple.get(3));
+ }
+ }
+ scanner.close();
+
+ assertEquals(tupleNum * 2, totalCounts);
+ }
+
+ private static boolean isProjectableStorage(StoreType type) {
+ switch (type) {
+ case RCFILE:
+ case TREVNI:
+ case CSV:
+ return true;
+ default:
+ return false;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
new file mode 100644
index 0000000..083670a
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestStorageManager {
+ private TajoConf conf;
+ private static String TEST_PATH = "target/test-data/TestStorageManager";
+ AbstractStorageManager sm = null;
+ private Path testDir;
+ private FileSystem fs;
+ @Before
+ public void setUp() throws Exception {
+ conf = new TajoConf();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testGetScannerAndAppender() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age",Type.INT4);
+ schema.addColumn("name",Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ Tuple[] tuples = new Tuple[4];
+ for(int i=0; i < tuples.length; i++) {
+ tuples[i] = new VTuple(3);
+ tuples[i].put(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i + 32),
+ DatumFactory.createText("name" + i)});
+ }
+
+ Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
+ fs.mkdirs(path.getParent());
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, path);
+ appender.init();
+ for(Tuple t : tuples) {
+ appender.addTuple(t);
+ }
+ appender.close();
+
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(meta, schema, path);
+ scanner.init();
+ int i=0;
+ while(scanner.next() != null) {
+ i++;
+ }
+ assertEquals(4,i);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
new file mode 100644
index 0000000..16b370c
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestStorages {
+ private TajoConf conf;
+ private static String TEST_PATH = "target/test-data/TestStorages";
+
+ private StoreType storeType;
+ private boolean splitable;
+ private boolean statsable;
+ private Path testDir;
+ private FileSystem fs;
+
+ public TestStorages(StoreType type, boolean splitable, boolean statsable) throws IOException {
+ this.storeType = type;
+ this.splitable = splitable;
+ this.statsable = statsable;
+
+ conf = new TajoConf();
+
+ if (storeType == StoreType.RCFILE) {
+ conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
+ }
+
+
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][] {
+ {StoreType.CSV, true, true},
+ {StoreType.RAW, false, false},
+ {StoreType.RCFILE, true, true},
+ {StoreType.TREVNI, false, true},
+ });
+ }
+
+ @Test
+ public void testSplitable() throws IOException {
+ if (splitable) {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ Path tablePath = new Path(testDir, "Splitable.data");
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for(int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(2);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ long randomNum = (long) (Math.random() * fileLen) + 1;
+
+ FileFragment[] tablets = new FileFragment[2];
+ tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+ tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ int tupleCnt = 0;
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ assertEquals(tupleNum, tupleCnt);
+ }
+ }
+
+ @Test
+ public void testProjection() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("score", Type.FLOAT4);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+
+ Path tablePath = new Path(testDir, "testProjection.data");
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for(int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(3);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(i + 2));
+ vTuple.put(2, DatumFactory.createFloat4(i + 3));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen());
+
+ Schema target = new Schema();
+ target.addColumn("age", Type.INT8);
+ target.addColumn("score", Type.FLOAT4);
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment, target);
+ scanner.init();
+ int tupleCnt = 0;
+ Tuple tuple;
+ while ((tuple = scanner.next()) != null) {
+ if (storeType == StoreType.RCFILE || storeType == StoreType.TREVNI || storeType == StoreType.CSV) {
+ assertTrue(tuple.get(0) == null);
+ }
+ assertEquals(DatumFactory.createInt8(tupleCnt + 2), tuple.getLong(1));
+ assertEquals(DatumFactory.createFloat4(tupleCnt + 3), tuple.getFloat(2));
+ tupleCnt++;
+ }
+ scanner.close();
+
+ assertEquals(tupleNum, tupleCnt);
+ }
+
+ @Test
+ public void testVariousTypes() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ Options options = new Options();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("hyunsik"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("hyunsik"),
+ DatumFactory.createBlob("hyunsik".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ }
+
+ @Test
+ public void testRCFileTextSerializeDeserialize() throws IOException {
+ if(storeType != StoreType.RCFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ Options options = new Options();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(RCFile.SERDE, TextSerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ }
+
+ @Test
+ public void testRCFileBinarySerializeDeserialize() throws IOException {
+ if(storeType != StoreType.RCFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ Options options = new Options();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(RCFile.SERDE, BinarySerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
new file mode 100644
index 0000000..7092953
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTupleComparator {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testCompare() {
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.INT4);
+ schema.addColumn("col2", Type.INT4);
+ schema.addColumn("col3", Type.INT4);
+ schema.addColumn("col4", Type.INT4);
+ schema.addColumn("col5", Type.TEXT);
+
+ Tuple tuple1 = new VTuple(5);
+ Tuple tuple2 = new VTuple(5);
+
+ tuple1.put(
+ new Datum[] {
+ DatumFactory.createInt4(9),
+ DatumFactory.createInt4(3),
+ DatumFactory.createInt4(33),
+ DatumFactory.createInt4(4),
+ DatumFactory.createText("abc")});
+ tuple2.put(
+ new Datum[] {
+ DatumFactory.createInt4(1),
+ DatumFactory.createInt4(25),
+ DatumFactory.createInt4(109),
+ DatumFactory.createInt4(4),
+ DatumFactory.createText("abd")});
+
+ SortSpec sortKey1 = new SortSpec(schema.getColumnByFQN("col4"), true, false);
+ SortSpec sortKey2 = new SortSpec(schema.getColumnByFQN("col5"), true, false);
+
+ TupleComparator tc = new TupleComparator(schema,
+ new SortSpec[] {sortKey1, sortKey2});
+ assertEquals(-1, tc.compare(tuple1, tuple2));
+ assertEquals(1, tc.compare(tuple2, tuple1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
new file mode 100644
index 0000000..05f47a5
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestVTuple.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.datum.DatumFactory;
+
+import static org.junit.Assert.*;
+
+public class TestVTuple {
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+
+ }
+
+ @Test
+ public void testContain() {
+ VTuple t1 = new VTuple(260);
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(1));
+ t1.put(27, DatumFactory.createInt4(1));
+ t1.put(96, DatumFactory.createInt4(1));
+ t1.put(257, DatumFactory.createInt4(1));
+
+ assertTrue(t1.contains(0));
+ assertTrue(t1.contains(1));
+ assertFalse(t1.contains(2));
+ assertFalse(t1.contains(3));
+ assertFalse(t1.contains(4));
+ assertTrue(t1.contains(27));
+ assertFalse(t1.contains(28));
+ assertFalse(t1.contains(95));
+ assertTrue(t1.contains(96));
+ assertFalse(t1.contains(97));
+ assertTrue(t1.contains(257));
+ }
+
+ @Test
+ public void testPut() {
+ VTuple t1 = new VTuple(260);
+ t1.put(0, DatumFactory.createText("str"));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(257, DatumFactory.createFloat4(0.76f));
+
+ assertTrue(t1.contains(0));
+ assertTrue(t1.contains(1));
+
+ assertEquals(t1.getString(0).toString(),"str");
+ assertEquals(t1.getInt(1).asInt4(),2);
+ assertTrue(t1.getFloat(257).asFloat4() == 0.76f);
+ }
+
+ @Test
+ public void testEquals() {
+ Tuple t1 = new VTuple(5);
+ Tuple t2 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+
+ assertEquals(t1,t2);
+
+ Tuple t3 = new VTuple(5);
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(4, DatumFactory.createInt4(2));
+
+ assertNotSame(t1,t3);
+ }
+
+ @Test
+ public void testHashCode() {
+ Tuple t1 = new VTuple(5);
+ Tuple t2 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("hyunsik"));
+
+ t2.put(0, DatumFactory.createInt4(1));
+ t2.put(1, DatumFactory.createInt4(2));
+ t2.put(3, DatumFactory.createInt4(2));
+ t2.put(4, DatumFactory.createText("hyunsik"));
+
+ assertEquals(t1.hashCode(),t2.hashCode());
+
+ Tuple t3 = new VTuple(5);
+ t3.put(0, DatumFactory.createInt4(1));
+ t3.put(1, DatumFactory.createInt4(2));
+ t3.put(4, DatumFactory.createInt4(2));
+
+ assertNotSame(t1.hashCode(),t3.hashCode());
+ }
+
+ @Test
+ public void testPutTuple() {
+ Tuple t1 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(2, DatumFactory.createInt4(3));
+
+ Tuple t2 = new VTuple(2);
+ t2.put(0, DatumFactory.createInt4(4));
+ t2.put(1, DatumFactory.createInt4(5));
+
+ t1.put(3, t2);
+
+ for (int i = 0; i < 5; i++) {
+ assertEquals(i+1, t1.get(i).asInt4());
+ }
+ }
+
+ @Test
+ public void testClone() throws CloneNotSupportedException {
+ Tuple t1 = new VTuple(5);
+
+ t1.put(0, DatumFactory.createInt4(1));
+ t1.put(1, DatumFactory.createInt4(2));
+ t1.put(3, DatumFactory.createInt4(2));
+ t1.put(4, DatumFactory.createText("str"));
+
+ VTuple t2 = (VTuple) t1.clone();
+ assertNotSame(t1, t2);
+ assertEquals(t1, t2);
+
+ assertSame(t1.get(4), t2.get(4));
+
+ t1.clear();
+ assertFalse(t1.equals(t2));
+ }
+}