You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:39 UTC
[25/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
deleted file mode 100644
index b742e6d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.UnsafeUtil;
-
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.charset.Charset;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public abstract class UnSafeTuple implements Tuple {
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
- private DirectBuffer bb;
- private int relativePos;
- private int length;
- private DataType [] types;
-
- protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
- this.bb = (DirectBuffer) bb;
- this.relativePos = relativePos;
- this.length = length;
- this.types = types;
- }
-
- void set(ByteBuffer bb, DataType [] types) {
- set(bb, 0, bb.limit(), types);
- }
-
- @Override
- public int size() {
- return types.length;
- }
-
- public ByteBuffer nioBuffer() {
- return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
- }
-
- public HeapTuple toHeapTuple() {
- byte [] bytes = new byte[length];
- UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length);
- return new HeapTuple(bytes, types);
- }
-
- public void copyFrom(UnSafeTuple tuple) {
- Preconditions.checkNotNull(tuple);
-
- ((ByteBuffer) bb).clear();
- if (length < tuple.length) {
- UnsafeUtil.free((ByteBuffer) bb);
- bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder());
- this.relativePos = 0;
- this.length = tuple.length;
- }
-
- ((ByteBuffer) bb).put(tuple.nioBuffer());
- }
-
- private int getFieldOffset(int fieldId) {
- return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
- }
-
- public long getFieldAddr(int fieldId) {
- int fieldOffset = getFieldOffset(fieldId);
- if (fieldOffset == -1) {
- throw new RuntimeException("Invalid Field Access: " + fieldId);
- }
- return bb.address() + relativePos + fieldOffset;
- }
-
- @Override
- public boolean contains(int fieldid) {
- return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public boolean isNull(int fieldid) {
- return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public boolean isNotNull(int fieldid) {
- return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
- }
-
- @Override
- public void clear() {
- // nothing to do
- }
-
- @Override
- public void put(int fieldId, Datum value) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
- }
-
- @Override
- public void put(int fieldId, Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
- }
-
- @Override
- public void put(int fieldId, Tuple tuple) {
- throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
- }
-
- @Override
- public void put(Datum[] values) {
- throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
- }
-
- @Override
- public Datum get(int fieldId) {
- if (isNull(fieldId)) {
- return NullDatum.get();
- }
-
- switch (types[fieldId].getType()) {
- case BOOLEAN:
- return DatumFactory.createBool(getBool(fieldId));
- case INT1:
- case INT2:
- return DatumFactory.createInt2(getInt2(fieldId));
- case INT4:
- return DatumFactory.createInt4(getInt4(fieldId));
- case INT8:
- return DatumFactory.createInt8(getInt4(fieldId));
- case FLOAT4:
- return DatumFactory.createFloat4(getFloat4(fieldId));
- case FLOAT8:
- return DatumFactory.createFloat8(getFloat8(fieldId));
- case TEXT:
- return DatumFactory.createText(getText(fieldId));
- case TIMESTAMP:
- return DatumFactory.createTimestamp(getInt8(fieldId));
- case DATE:
- return DatumFactory.createDate(getInt4(fieldId));
- case TIME:
- return DatumFactory.createTime(getInt8(fieldId));
- case INTERVAL:
- return getInterval(fieldId);
- case INET4:
- return DatumFactory.createInet4(getInt4(fieldId));
- case PROTOBUF:
- return getProtobufDatum(fieldId);
- default:
- throw new UnsupportedException("Unknown type: " + types[fieldId]);
- }
- }
-
- @Override
- public void setOffset(long offset) {
- }
-
- @Override
- public long getOffset() {
- return 0;
- }
-
- @Override
- public boolean getBool(int fieldId) {
- return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01;
- }
-
- @Override
- public byte getByte(int fieldId) {
- return UNSAFE.getByte(getFieldAddr(fieldId));
- }
-
- @Override
- public char getChar(int fieldId) {
- return UNSAFE.getChar(getFieldAddr(fieldId));
- }
-
- @Override
- public byte[] getBytes(int fieldId) {
- long pos = getFieldAddr(fieldId);
- int len = UNSAFE.getInt(pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return bytes;
- }
-
- @Override
- public short getInt2(int fieldId) {
- long addr = getFieldAddr(fieldId);
- return UNSAFE.getShort(addr);
- }
-
- @Override
- public int getInt4(int fieldId) {
- return UNSAFE.getInt(getFieldAddr(fieldId));
- }
-
- @Override
- public long getInt8(int fieldId) {
- return UNSAFE.getLong(getFieldAddr(fieldId));
- }
-
- @Override
- public float getFloat4(int fieldId) {
- return UNSAFE.getFloat(getFieldAddr(fieldId));
- }
-
- @Override
- public double getFloat8(int fieldId) {
- return UNSAFE.getDouble(getFieldAddr(fieldId));
- }
-
- @Override
- public String getText(int fieldId) {
- long pos = getFieldAddr(fieldId);
- int len = UNSAFE.getInt(pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return new String(bytes);
- }
-
- public IntervalDatum getInterval(int fieldId) {
- long pos = getFieldAddr(fieldId);
- int months = UNSAFE.getInt(pos);
- pos += SizeOf.SIZE_OF_INT;
- long millisecs = UNSAFE.getLong(pos);
- return new IntervalDatum(months, millisecs);
- }
-
- @Override
- public Datum getProtobufDatum(int fieldId) {
- byte [] bytes = getBytes(fieldId);
-
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
- Message.Builder builder = factory.newBuilder();
- try {
- builder.mergeFrom(bytes);
- } catch (InvalidProtocolBufferException e) {
- return NullDatum.get();
- }
-
- return new ProtobufDatum(builder.build());
- }
-
- @Override
- public char[] getUnicodeChars(int fieldId) {
- long pos = getFieldAddr(fieldId);
- int len = UNSAFE.getInt(pos);
- pos += SizeOf.SIZE_OF_INT;
-
- byte [] bytes = new byte[len];
- UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
- return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
- }
-
- @Override
- public Tuple clone() throws CloneNotSupportedException {
- return toHeapTuple();
- }
-
- @Override
- public Datum[] getValues() {
- Datum [] datums = new Datum[size()];
- for (int i = 0; i < size(); i++) {
- if (contains(i)) {
- datums[i] = get(i);
- } else {
- datums[i] = NullDatum.get();
- }
- }
- return datums;
- }
-
- @Override
- public String toString() {
- return VTuple.toDisplayString(getValues());
- }
-
- public abstract void release();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
deleted file mode 100644
index 73e1e2f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedLongs;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-
-import java.nio.ByteOrder;
-
-/**
- * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
- */
-public class UnSafeTupleBytesComparator {
- private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
- static final boolean littleEndian =
- ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
-
- public static int compare(long ptr1, long ptr2) {
- int lstrLen = UNSAFE.getInt(ptr1);
- int rstrLen = UNSAFE.getInt(ptr2);
-
- ptr1 += SizeOf.SIZE_OF_INT;
- ptr2 += SizeOf.SIZE_OF_INT;
-
- int minLength = Math.min(lstrLen, rstrLen);
- int minWords = minLength / Longs.BYTES;
-
- /*
- * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
- * time is no slower than comparing 4 bytes at a time even on 32-bit.
- * On the other hand, it is substantially faster on 64-bit.
- */
- for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
- long lw = UNSAFE.getLong(ptr1);
- long rw = UNSAFE.getLong(ptr2);
- long diff = lw ^ rw;
-
- if (diff != 0) {
- if (!littleEndian) {
- return UnsignedLongs.compare(lw, rw);
- }
-
- // Use binary search
- int n = 0;
- int y;
- int x = (int) diff;
- if (x == 0) {
- x = (int) (diff >>> 32);
- n = 32;
- }
-
- y = x << 16;
- if (y == 0) {
- n += 16;
- } else {
- x = y;
- }
-
- y = x << 8;
- if (y == 0) {
- n += 8;
- }
- return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
- }
-
- ptr1 += SizeOf.SIZE_OF_LONG;
- ptr2 += SizeOf.SIZE_OF_LONG;
- }
-
- // The epilogue to cover the last (minLength % 8) elements.
- for (int i = minWords * Longs.BYTES; i < minLength; i++) {
- int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
- if (result != 0) {
- return result;
- }
- }
- return lstrLen - rstrLen;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
deleted file mode 100644
index 51dbb29..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import java.nio.ByteBuffer;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class ZeroCopyTuple extends UnSafeTuple {
-
- public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
- super.set(bb, relativePos, length, types);
- }
-
- @Override
- public void release() {
- // nothing to do
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/proto/IndexProtos.proto b/tajo-storage/src/main/proto/IndexProtos.proto
deleted file mode 100644
index f5c8a08..0000000
--- a/tajo-storage/src/main/proto/IndexProtos.proto
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.index";
-option java_outer_classname = "IndexProtos";
-option optimize_for = SPEED;
-option java_generic_services = false;
-option java_generate_equals_and_hash = true;
-
-import "CatalogProtos.proto";
-
-message TupleComparatorProto {
- required SchemaProto schema = 1;
- repeated SortSpecProto sortSpecs = 2;
- repeated TupleComparatorSpecProto compSpecs = 3;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
deleted file mode 100644
index e861b7d..0000000
--- a/tajo-storage/src/main/resources/storage-default.xml
+++ /dev/null
@@ -1,175 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<configuration>
- <property>
- <name>tajo.storage.manager.maxReadBytes</name>
- <value>8388608</value>
- <description></description>
- </property>
-
- <property>
- <name>tajo.storage.manager.concurrency.perDisk</name>
- <value>1</value>
- <description></description>
- </property>
-
- <!--- Registered Scanner Handler -->
- <property>
- <name>tajo.storage.scanner-handler</name>
- <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value>
- </property>
-
- <!--- Fragment Class Configurations -->
- <property>
- <name>tajo.storage.fragment.textfile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.csv.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.json.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.raw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.rcfile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.row.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.parquet.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.sequencefile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.avro.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
-
- <!--- Scanner Handler -->
- <property>
- <name>tajo.storage.scanner-handler.textfile.class</name>
- <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.csv.class</name>
- <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.json.class</name>
- <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.raw.class</name>
- <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.rcfile.class</name>
- <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.rowfile.class</name>
- <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.parquet.class</name>
- <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.sequencefile.class</name>
- <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.avro.class</name>
- <value>org.apache.tajo.storage.avro.AvroScanner</value>
- </property>
-
- <!--- Appender Handler -->
- <property>
- <name>tajo.storage.appender-handler</name>
- <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.textfile.class</name>
- <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.csv.class</name>
- <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.json.class</name>
- <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.raw.class</name>
- <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.rcfile.class</name>
- <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.rowfile.class</name>
- <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.parquet.class</name>
- <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.sequencefile.class</name>
- <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.avro.class</name>
- <value>org.apache.tajo.storage.avro.AvroAppender</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
deleted file mode 100644
index cf8a54e..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-
-public class HttpFileServer {
- private final static Log LOG = LogFactory.getLog(HttpFileServer.class);
-
- private final InetSocketAddress addr;
- private InetSocketAddress bindAddr;
- private ServerBootstrap bootstrap = null;
- private ChannelFactory factory = null;
- private ChannelGroup channelGroup = null;
-
- public HttpFileServer(final InetSocketAddress addr) {
- this.addr = addr;
- this.factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
- 2);
-
- // Configure the server.
- this.bootstrap = new ServerBootstrap(factory);
- // Set up the event pipeline factory.
- this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory());
- this.channelGroup = new DefaultChannelGroup();
- }
-
- public HttpFileServer(String bindaddr) {
- this(NetUtils.createSocketAddr(bindaddr));
- }
-
- public void start() {
- // Bind and start to accept incoming connections.
- Channel channel = bootstrap.bind(addr);
- channelGroup.add(channel);
- this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
- LOG.info("HttpFileServer starts up ("
- + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
- + ")");
- }
-
- public InetSocketAddress getBindAddress() {
- return this.bindAddr;
- }
-
- public void stop() {
- ChannelGroupFuture future = channelGroup.close();
- future.awaitUninterruptibly();
- factory.releaseExternalResources();
-
- LOG.info("HttpFileServer shutdown ("
- + this.bindAddr.getAddress().getHostAddress() + ":"
- + this.bindAddr.getPort() + ")");
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
deleted file mode 100644
index 6c77317..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.RandomAccessFile;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-/**
- * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6
- */
-public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
- }
-
- final String path = sanitizeUri(request.getUri());
- if (path == null) {
- sendError(ctx, FORBIDDEN);
- return;
- }
-
- File file = new File(path);
- if (file.isHidden() || !file.exists()) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- if (!file.isFile()) {
- sendError(ctx, FORBIDDEN);
- return;
- }
-
- RandomAccessFile raf;
- try {
- raf = new RandomAccessFile(file, "r");
- } catch (FileNotFoundException fnfe) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- long fileLength = raf.length();
-
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- setContentLength(response, fileLength);
- setContentTypeHeader(response);
-
- Channel ch = e.getChannel();
-
- // Write the initial line and the header.
- ch.write(response);
-
- // Write the content.
- ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) != null) {
- // Cannot use zero-copy with HTTPS.
- writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
- } else {
- // No encryption - use zero-copy.
- final FileRegion region =
- new DefaultFileRegion(raf.getChannel(), 0, fileLength);
- writeFuture = ch.write(region);
- writeFuture.addListener(new ChannelFutureProgressListener() {
- public void operationComplete(ChannelFuture future) {
- region.releaseExternalResources();
- }
-
- public void operationProgressed(
- ChannelFuture future, long amount, long current, long total) {
- System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
- }
- });
- }
-
- // Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
- // Close the connection when the whole content is written out.
- writeFuture.addListener(ChannelFutureListener.CLOSE);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
- if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
- return;
- }
-
- cause.printStackTrace();
- if (ch.isConnected()) {
- sendError(ctx, INTERNAL_SERVER_ERROR);
- }
- }
-
- private static String sanitizeUri(String uri) {
- // Decode the path.
- try {
- uri = URLDecoder.decode(uri, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- try {
- uri = URLDecoder.decode(uri, "ISO-8859-1");
- } catch (UnsupportedEncodingException e1) {
- throw new Error();
- }
- }
-
- // Convert file separators.
- uri = uri.replace('/', File.separatorChar);
-
- // Simplistic dumb security check.
- // You will have to do something serious in the production environment.
- if (uri.contains(File.separator + '.') ||
- uri.contains('.' + File.separator) ||
- uri.startsWith(".") || uri.endsWith(".")) {
- return null;
- }
-
- return uri;
- }
-
- private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(ChannelBuffers.copiedBuffer(
- "Failure: " + status.toString() + "\r\n",
- CharsetUtil.UTF_8));
-
- // Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
- }
-
- /**
- * Sets the content type header for the HTTP Response
- *
- * @param response
- * HTTP response
- */
- private static void setContentTypeHeader(HttpResponse response) {
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
deleted file mode 100644
index cecf93b..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-// Uncomment the following lines if you want HTTPS
-//import javax.net.ssl.SSLEngine;
-//import org.jboss.netty.example.securechat.SecureChatSslContextFactory;
-//import org.jboss.netty.handler.ssl.SslHandler;
-
-//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6
-public class HttpFileServerPipelineFactory implements ChannelPipelineFactory {
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = pipeline();
-
- // Uncomment the following lines if you want HTTPS
- //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
- //engine.setUseClientMode(false);
- //pipeline.addLast("ssl", new SslHandler(engine));
-
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
-
- pipeline.addLast("handler", new HttpFileServerHandler());
- return pipeline;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
deleted file mode 100644
index fd5a63e..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
-import org.apache.tajo.storage.text.DelimitedTextFile;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestCompressionStorages {
- private TajoConf conf;
- private static String TEST_PATH = "target/test-data/TestCompressionStorages";
-
- private StoreType storeType;
- private Path testDir;
- private FileSystem fs;
-
- public TestCompressionStorages(StoreType type) throws IOException {
- this.storeType = type;
- conf = new TajoConf();
-
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- fs = testDir.getFileSystem(conf);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> generateParameters() {
- return Arrays.asList(new Object[][]{
- {StoreType.CSV},
- {StoreType.RCFILE},
- {StoreType.SEQUENCEFILE},
- {StoreType.TEXTFILE}
- });
- }
-
- @Test
- public void testDeflateCodecCompressionData() throws IOException {
- storageCompressionTest(storeType, DeflateCodec.class);
- }
-
- @Test
- public void testGzipCodecCompressionData() throws IOException {
- if (storeType == StoreType.RCFILE) {
- if( ZlibFactory.isNativeZlibLoaded(conf)) {
- storageCompressionTest(storeType, GzipCodec.class);
- }
- } else if (storeType == StoreType.SEQUENCEFILE) {
- if( ZlibFactory.isNativeZlibLoaded(conf)) {
- storageCompressionTest(storeType, GzipCodec.class);
- }
- } else {
- storageCompressionTest(storeType, GzipCodec.class);
- }
- }
-
- @Test
- public void testSnappyCodecCompressionData() throws IOException {
- if (SnappyCodec.isNativeCodeLoaded()) {
- storageCompressionTest(storeType, SnappyCodec.class);
- }
- }
-
- @Test
- public void testLz4CodecCompressionData() throws IOException {
- if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
- storageCompressionTest(storeType, Lz4Codec.class);
- }
-
- private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException {
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.FLOAT4);
- schema.addColumn("name", Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(storeType);
- meta.putOption("compression.codec", codec.getCanonicalName());
- meta.putOption("compression.type", SequenceFile.CompressionType.BLOCK.name());
- meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
- meta.putOption("sequencefile.serde", TextSerializerDeserializer.class.getName());
-
- String fileName = "Compression_" + codec.getSimpleName();
- Path tablePath = new Path(testDir, fileName);
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.enableStats();
-
- appender.init();
-
- String extension = "";
- if (appender instanceof CSVFile.CSVAppender) {
- extension = ((CSVFile.CSVAppender) appender).getExtension();
- } else if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) {
- extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
- }
-
- int tupleNum = 100000;
- VTuple vTuple;
-
- for (int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(3);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createFloat4((float) i));
- vTuple.put(2, DatumFactory.createText(String.valueOf(i)));
- appender.addTuple(vTuple);
- }
- appender.close();
-
- TableStats stat = appender.getStats();
- assertEquals(tupleNum, stat.getNumRows().longValue());
- tablePath = tablePath.suffix(extension);
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment[] tablets = new FileFragment[1];
- tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
-
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
-
- if (StoreType.CSV == storeType) {
- if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
- assertTrue(scanner.isSplittable());
- } else {
- assertFalse(scanner.isSplittable());
- }
- }
- scanner.init();
-
- if (storeType == StoreType.SEQUENCEFILE) {
- assertTrue(scanner instanceof SequenceFileScanner);
- Writable key = ((SequenceFileScanner) scanner).getKey();
- assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
- }
-
- int tupleCnt = 0;
- Tuple tuple;
- while ((tuple = scanner.next()) != null) {
- tupleCnt++;
- }
- scanner.close();
- assertEquals(tupleNum, tupleCnt);
- assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
- assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
deleted file mode 100644
index 93fb12b..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.FileUtil;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-
-import static org.junit.Assert.*;
-
-public class TestDelimitedTextFile {
-
- private static Schema schema = new Schema();
-
- private static Tuple baseTuple = new VTuple(10);
-
- static {
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.CHAR, 7);
- schema.addColumn("col3", Type.INT2);
- schema.addColumn("col4", Type.INT4);
- schema.addColumn("col5", Type.INT8);
- schema.addColumn("col6", Type.FLOAT4);
- schema.addColumn("col7", Type.FLOAT8);
- schema.addColumn("col8", Type.TEXT);
- schema.addColumn("col9", Type.BLOB);
- schema.addColumn("col10", Type.INET4);
-
- baseTuple.put(new Datum[] {
- DatumFactory.createBool(true), // 0
- DatumFactory.createChar("hyunsik"), // 1
- DatumFactory.createInt2((short) 17), // 2
- DatumFactory.createInt4(59), // 3
- DatumFactory.createInt8(23l), // 4
- DatumFactory.createFloat4(77.9f), // 5
- DatumFactory.createFloat8(271.9d), // 6
- DatumFactory.createText("hyunsik"), // 7
- DatumFactory.createBlob("hyunsik".getBytes()),// 8
- DatumFactory.createInet4("192.168.0.1"), // 9
- });
- }
-
- public static Path getResourcePath(String path, String suffix) {
- URL resultBaseURL = ClassLoader.getSystemResource(path);
- return new Path(resultBaseURL.toString(), suffix);
- }
-
- public static Path getResultPath(Class clazz, String fileName) {
- return new Path (getResourcePath("results", clazz.getSimpleName()), fileName);
- }
-
- public static String getResultText(Class clazz, String fileName) throws IOException {
- FileSystem localFS = FileSystem.getLocal(new Configuration());
- Path path = getResultPath(clazz, fileName);
- Preconditions.checkState(localFS.exists(path) && localFS.isFile(path));
- return FileUtil.readTextFile(new File(path.toUri()));
- }
-
- private static final FileFragment getFileFragment(String fileName) throws IOException {
- TajoConf conf = new TajoConf();
- Path tablePath = new Path(getResourcePath("dataset", "TestDelimitedTextFile"), fileName);
- FileSystem fs = FileSystem.getLocal(conf);
- FileStatus status = fs.getFileStatus(tablePath);
- return new FileFragment("table", tablePath, 0, status.getLen());
- }
-
- @Test
- public void testIgnoreAllErrors() throws IOException {
- TajoConf conf = new TajoConf();
-
- TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
- meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
- FileFragment fragment = getFileFragment("testErrorTolerance1.json");
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
- scanner.init();
-
- Tuple tuple;
- int i = 0;
- while ((tuple = scanner.next()) != null) {
- assertEquals(baseTuple, tuple);
- i++;
- }
- assertEquals(3, i);
- scanner.close();
- }
-
- @Test
- public void testIgnoreOneErrorTolerance() throws IOException {
-
-
- TajoConf conf = new TajoConf();
-
- TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
- meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
- FileFragment fragment = getFileFragment("testErrorTolerance1.json");
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
- scanner.init();
-
- assertNotNull(scanner.next());
- assertNotNull(scanner.next());
- try {
- scanner.next();
- } catch (IOException ioe) {
- System.out.println(ioe);
- return;
- } finally {
- scanner.close();
- }
- fail();
- }
-
- @Test
- public void testNoErrorTolerance() throws IOException {
- TajoConf conf = new TajoConf();
- TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON);
- meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
- FileFragment fragment = getFileFragment("testErrorTolerance2.json");
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
- scanner.init();
-
- try {
- scanner.next();
- } catch (IOException ioe) {
- return;
- } finally {
- scanner.close();
- }
- fail();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
deleted file mode 100644
index bec0daf..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.??See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.??The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.??You may obtain a copy of the License at
- *
- *?????http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class TestFileSystems {
-
- private static String TEST_PATH = "target/test-data/TestFileSystem";
- private Configuration conf;
- private StorageManager sm;
- private FileSystem fs;
- private Path testDir;
-
- public TestFileSystems(FileSystem fs) throws IOException {
- this.fs = fs;
- this.conf = fs.getConf();
- this.testDir = getTestDir(this.fs, TEST_PATH);
- this.sm = StorageManager.getStorageManager(new TajoConf(this.conf));
- }
-
- public Path getTestDir(FileSystem fs, String dir) throws IOException {
- Path path = new Path(dir);
- if (fs.exists(path))
- fs.delete(path, true);
-
- fs.mkdirs(path);
-
- return fs.makeQualified(path);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> generateParameters() throws IOException {
- return Arrays.asList(new Object[][]{
- {FileSystem.getLocal(new TajoConf())},
- });
- }
-
- @Before
- public void setup() throws IOException {
- if (!(fs instanceof LocalFileSystem)) {
- conf.set("fs.local.block.size", "10");
- fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
- fs.setConf(conf);
- }
- }
-
- @After
- public void tearDown() throws IOException {
- if (!(fs instanceof LocalFileSystem)) {
- fs.setConf(new TajoConf());
- }
- }
-
- @Test
- public void testBlockSplit() throws IOException {
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT4);
- schema.addColumn("name", Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- Tuple[] tuples = new Tuple[4];
- for (int i = 0; i < tuples.length; i++) {
- tuples[i] = new VTuple(3);
- tuples[i]
- .put(new Datum[]{DatumFactory.createInt4(i),
- DatumFactory.createInt4(i + 32),
- DatumFactory.createText("name" + i)});
- }
-
- Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
- "table.csv");
- fs.mkdirs(path.getParent());
-
- Appender appender = sm.getAppender(meta, schema, path);
- appender.init();
- for (Tuple t : tuples) {
- appender.addTuple(t);
- }
- appender.close();
- FileStatus fileStatus = fs.getFileStatus(path);
-
- List<FileFragment> splits = sm.getSplits("table", meta, schema, path);
- int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
- assertEquals(splitSize, splits.size());
-
- for (FileFragment fragment : splits) {
- assertTrue(fragment.getEndKey() <= fileStatus.getBlockSize());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
deleted file mode 100644
index 387fed5..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestFrameTuple {
- private Tuple tuple1;
- private Tuple tuple2;
-
- @Before
- public void setUp() throws Exception {
- tuple1 = new VTuple(11);
- tuple1.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
- DatumFactory.createChar('9'),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("hyunsik"),
- DatumFactory.createBlob("hyunsik".getBytes()),
- DatumFactory.createInet4("192.168.0.1")
- });
-
- tuple2 = new VTuple(11);
- tuple2.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
- DatumFactory.createChar('9'),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("hyunsik"),
- DatumFactory.createBlob("hyunsik".getBytes()),
- DatumFactory.createInet4("192.168.0.1")
- });
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public final void testFrameTuple() {
- Tuple frame = new FrameTuple(tuple1, tuple2);
- assertEquals(22, frame.size());
- for (int i = 0; i < 22; i++) {
- assertTrue(frame.contains(i));
- }
-
- assertEquals(DatumFactory.createInt8(23l), frame.get(5));
- assertEquals(DatumFactory.createInt8(23l), frame.get(16));
- assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
- assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
deleted file mode 100644
index c6149f7..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.util.BytesUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TestLazyTuple {
-
- Schema schema;
- byte[][] textRow;
- byte[] nullbytes;
- SerializerDeserializer serde;
-
- @Before
- public void setUp() {
- nullbytes = "\\N".getBytes();
-
- schema = new Schema();
- schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
- schema.addColumn("col2", TajoDataTypes.Type.BIT);
- schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7);
- schema.addColumn("col4", TajoDataTypes.Type.INT2);
- schema.addColumn("col5", TajoDataTypes.Type.INT4);
- schema.addColumn("col6", TajoDataTypes.Type.INT8);
- schema.addColumn("col7", TajoDataTypes.Type.FLOAT4);
- schema.addColumn("col8", TajoDataTypes.Type.FLOAT8);
- schema.addColumn("col9", TajoDataTypes.Type.TEXT);
- schema.addColumn("col10", TajoDataTypes.Type.BLOB);
- schema.addColumn("col11", TajoDataTypes.Type.INET4);
- schema.addColumn("col12", TajoDataTypes.Type.INT4);
- schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE);
-
- StringBuilder sb = new StringBuilder();
- sb.append(DatumFactory.createBool(true)).append('|');
- sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|');
- sb.append(DatumFactory.createChar("str")).append('|');
- sb.append(DatumFactory.createInt2((short) 17)).append('|');
- sb.append(DatumFactory.createInt4(59)).append('|');
- sb.append(DatumFactory.createInt8(23l)).append('|');
- sb.append(DatumFactory.createFloat4(77.9f)).append('|');
- sb.append(DatumFactory.createFloat8(271.9f)).append('|');
- sb.append(DatumFactory.createText("str2")).append('|');
- sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
- sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
- sb.append(new String(nullbytes)).append('|');
- sb.append(NullDatum.get());
- textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|');
- serde = new TextSerializerDeserializer();
- }
-
- @Test
- public void testGetDatum() {
-
- LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
- assertEquals(DatumFactory.createBool(true), t1.get(0));
- assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
- assertEquals(DatumFactory.createChar("str"), t1.get(2));
- assertEquals(DatumFactory.createInt2((short) 17), t1.get(3));
- assertEquals(DatumFactory.createInt4(59), t1.get(4));
- assertEquals(DatumFactory.createInt8(23l), t1.get(5));
- assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6));
- assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7));
- assertEquals(DatumFactory.createText("str2"), t1.get(8));
- assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9));
- assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10));
- assertEquals(NullDatum.get(), t1.get(11));
- assertEquals(NullDatum.get(), t1.get(12));
- }
-
- @Test
- public void testContain() {
- int colNum = schema.size();
-
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(3, DatumFactory.createInt4(1));
- t1.put(7, DatumFactory.createInt4(1));
-
- assertTrue(t1.contains(0));
- assertFalse(t1.contains(1));
- assertFalse(t1.contains(2));
- assertTrue(t1.contains(3));
- assertFalse(t1.contains(4));
- assertFalse(t1.contains(5));
- assertFalse(t1.contains(6));
- assertTrue(t1.contains(7));
- assertFalse(t1.contains(8));
- assertFalse(t1.contains(9));
- assertFalse(t1.contains(10));
- assertFalse(t1.contains(11));
- assertFalse(t1.contains(12));
- }
-
- @Test
- public void testPut() {
- int colNum = schema.size();
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
- t1.put(0, DatumFactory.createText("str"));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(11, DatumFactory.createFloat4(0.76f));
-
- assertTrue(t1.contains(0));
- assertTrue(t1.contains(1));
-
- assertEquals(t1.getText(0), "str");
- assertEquals(t1.get(1).asInt4(), 2);
- assertTrue(t1.get(11).asFloat4() == 0.76f);
- }
-
- @Test
- public void testEquals() {
- int colNum = schema.size();
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
- LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(3, DatumFactory.createInt4(2));
-
- t2.put(0, DatumFactory.createInt4(1));
- t2.put(1, DatumFactory.createInt4(2));
- t2.put(3, DatumFactory.createInt4(2));
-
- assertEquals(t1, t2);
-
- Tuple t3 = new VTuple(colNum);
- t3.put(0, DatumFactory.createInt4(1));
- t3.put(1, DatumFactory.createInt4(2));
- t3.put(3, DatumFactory.createInt4(2));
- assertEquals(t1, t3);
- assertEquals(t2, t3);
-
- LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1);
- assertNotSame(t1, t4);
- }
-
- @Test
- public void testHashCode() {
- int colNum = schema.size();
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
- LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(3, DatumFactory.createInt4(2));
- t1.put(4, DatumFactory.createText("str"));
-
- t2.put(0, DatumFactory.createInt4(1));
- t2.put(1, DatumFactory.createInt4(2));
- t2.put(3, DatumFactory.createInt4(2));
- t2.put(4, DatumFactory.createText("str"));
-
- assertEquals(t1.hashCode(), t2.hashCode());
-
- Tuple t3 = new VTuple(colNum);
- t3.put(0, DatumFactory.createInt4(1));
- t3.put(1, DatumFactory.createInt4(2));
- t3.put(3, DatumFactory.createInt4(2));
- t3.put(4, DatumFactory.createText("str"));
- assertEquals(t1.hashCode(), t3.hashCode());
- assertEquals(t2.hashCode(), t3.hashCode());
-
- Tuple t4 = new VTuple(5);
- t4.put(0, DatumFactory.createInt4(1));
- t4.put(1, DatumFactory.createInt4(2));
- t4.put(4, DatumFactory.createInt4(2));
-
- assertNotSame(t1.hashCode(), t4.hashCode());
- }
-
- @Test
- public void testPutTuple() {
- int colNum = schema.size();
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(2, DatumFactory.createInt4(3));
-
-
- Schema schema2 = new Schema();
- schema2.addColumn("col1", TajoDataTypes.Type.INT8);
- schema2.addColumn("col2", TajoDataTypes.Type.INT8);
-
- LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1);
- t2.put(0, DatumFactory.createInt4(4));
- t2.put(1, DatumFactory.createInt4(5));
-
- t1.put(3, t2);
-
- for (int i = 0; i < 5; i++) {
- assertEquals(i + 1, t1.get(i).asInt4());
- }
- }
-
- @Test
- public void testInvalidNumber() {
- byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|');
- Schema schema = new Schema();
- schema.addColumn("col1", TajoDataTypes.Type.INT2);
- schema.addColumn("col2", TajoDataTypes.Type.INT4);
- schema.addColumn("col3", TajoDataTypes.Type.INT8);
- schema.addColumn("col4", TajoDataTypes.Type.FLOAT4);
- schema.addColumn("col5", TajoDataTypes.Type.FLOAT8);
-
- LazyTuple tuple = new LazyTuple(schema, bytes, 0);
- assertEquals(bytes.length, tuple.size());
-
- for (int i = 0; i < tuple.size(); i++){
- assertEquals(NullDatum.get(), tuple.get(i));
- }
- }
-
- @Test
- public void testClone() throws CloneNotSupportedException {
- int colNum = schema.size();
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(3, DatumFactory.createInt4(2));
- t1.put(4, DatumFactory.createText("str"));
-
- LazyTuple t2 = (LazyTuple) t1.clone();
- assertNotSame(t1, t2);
- assertEquals(t1, t2);
-
- assertSame(t1.get(4), t2.get(4));
-
- t1.clear();
- assertFalse(t1.equals(t2));
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
deleted file mode 100644
index bfaba04..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.text.ByteBufLineReader;
-import org.apache.tajo.storage.text.DelimitedLineReader;
-import org.apache.tajo.storage.text.DelimitedTextFile;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.FileUtil;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.*;
-
-public class TestLineReader {
- private static String TEST_PATH = "target/test-data/TestLineReader";
-
- @Test
- public void testByteBufLineReader() throws IOException {
- TajoConf conf = new TajoConf();
- Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- FileSystem fs = testDir.getFileSystem(conf);
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT8);
- schema.addColumn("comment", Type.TEXT);
- schema.addColumn("comment2", Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
- Path tablePath = new Path(testDir, "line.data");
- FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema,
- tablePath);
- appender.enableStats();
- appender.init();
- int tupleNum = 10000;
- VTuple vTuple;
-
- for (int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(4);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createInt8(25l));
- vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
- vTuple.put(3, NullDatum.get());
- appender.addTuple(vTuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
-
- ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
- ByteBufLineReader reader = new ByteBufLineReader(channel);
-
- long totalRead = 0;
- int i = 0;
- AtomicInteger bytes = new AtomicInteger();
- for(;;){
- ByteBuf buf = reader.readLineBuf(bytes);
- totalRead += bytes.get();
- if(buf == null) break;
- i++;
- }
- IOUtils.cleanup(null, reader, channel, fs);
- assertEquals(tupleNum, i);
- assertEquals(status.getLen(), totalRead);
- assertEquals(status.getLen(), reader.readBytes());
- }
-
- @Test
- public void testLineDelimitedReader() throws IOException {
- TajoConf conf = new TajoConf();
- Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- FileSystem fs = testDir.getFileSystem(conf);
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT8);
- schema.addColumn("comment", Type.TEXT);
- schema.addColumn("comment2", Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
- meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
-
- Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName());
- FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema,
- tablePath);
- appender.enableStats();
- appender.init();
- int tupleNum = 10000;
- VTuple vTuple;
-
- long splitOffset = 0;
- for (int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(4);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createInt8(25l));
- vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
- vTuple.put(3, NullDatum.get());
- appender.addTuple(vTuple);
-
- if(i == (tupleNum / 2)){
- splitOffset = appender.getOffset();
- }
- }
- String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
- appender.close();
-
- tablePath = tablePath.suffix(extension);
- FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset);
- DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF
- assertTrue(reader.isCompressed());
- assertFalse(reader.isReadable());
- reader.init();
- assertTrue(reader.isReadable());
-
-
- int i = 0;
- while(reader.isReadable()){
- ByteBuf buf = reader.readLine();
- if(buf == null) break;
- i++;
- }
-
- IOUtils.cleanup(null, reader, fs);
- assertEquals(tupleNum, i);
-
- }
-
- @Test
- public void testByteBufLineReaderWithoutTerminating() throws IOException {
- String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile();
- File file = new File(path);
- String data = FileUtil.readTextFile(file);
-
- ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
- ByteBufLineReader reader = new ByteBufLineReader(channel);
-
- long totalRead = 0;
- int i = 0;
- AtomicInteger bytes = new AtomicInteger();
- for(;;){
- ByteBuf buf = reader.readLineBuf(bytes);
- totalRead += bytes.get();
- if(buf == null) break;
- i++;
- }
- IOUtils.cleanup(null, reader);
- assertEquals(file.length(), totalRead);
- assertEquals(file.length(), reader.readBytes());
- assertEquals(data.split("\n").length, i);
- }
-
- @Test
- public void testCRLFLine() throws IOException {
- TajoConf conf = new TajoConf();
- Path testFile = new Path(CommonTestingUtil.getTestDir(TEST_PATH), "testCRLFLineText.txt");
-
- FileSystem fs = testFile.getFileSystem(conf);
- FSDataOutputStream outputStream = fs.create(testFile, true);
- outputStream.write("0\r\n1\r\n".getBytes());
- outputStream.flush();
- IOUtils.closeStream(outputStream);
-
- ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(testFile));
- ByteBufLineReader reader = new ByteBufLineReader(channel, BufferPool.directBuffer(2));
- FileStatus status = fs.getFileStatus(testFile);
-
- long totalRead = 0;
- int i = 0;
- AtomicInteger bytes = new AtomicInteger();
- for(;;){
- ByteBuf buf = reader.readLineBuf(bytes);
- totalRead += bytes.get();
- if(buf == null) break;
- String row = buf.toString(Charset.defaultCharset());
- assertEquals(i, Integer.parseInt(row));
- i++;
- }
- IOUtils.cleanup(null, reader);
- assertEquals(status.getLen(), totalRead);
- assertEquals(status.getLen(), reader.readBytes());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
deleted file mode 100644
index e6714b5..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.TUtil;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestMergeScanner {
- private TajoConf conf;
- StorageManager sm;
- private static String TEST_PATH = "target/test-data/TestMergeScanner";
-
- private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA =
- "{\n" +
- " \"type\": \"record\",\n" +
- " \"namespace\": \"org.apache.tajo\",\n" +
- " \"name\": \"testMultipleFiles\",\n" +
- " \"fields\": [\n" +
- " { \"name\": \"id\", \"type\": \"int\" },\n" +
- " { \"name\": \"file\", \"type\": \"string\" },\n" +
- " { \"name\": \"name\", \"type\": \"string\" },\n" +
- " { \"name\": \"age\", \"type\": \"long\" }\n" +
- " ]\n" +
- "}\n";
-
- private Path testDir;
- private StoreType storeType;
- private FileSystem fs;
-
- public TestMergeScanner(StoreType storeType) {
- this.storeType = storeType;
- }
-
- @Parameters
- public static Collection<Object[]> generateParameters() {
- return Arrays.asList(new Object[][] {
- {StoreType.CSV},
- {StoreType.RAW},
- {StoreType.RCFILE},
- {StoreType.PARQUET},
- {StoreType.SEQUENCEFILE},
- {StoreType.AVRO},
- // RowFile requires Byte-buffer read support, so we omitted RowFile.
- //{StoreType.ROWFILE},
- });
- }
-
- @Before
- public void setup() throws Exception {
- conf = new TajoConf();
- conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
- conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- fs = testDir.getFileSystem(conf);
- sm = StorageManager.getStorageManager(conf, testDir);
- }
-
- @Test
- public void testMultipleFiles() throws IOException {
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("file", Type.TEXT);
- schema.addColumn("name", Type.TEXT);
- schema.addColumn("age", Type.INT8);
-
- KeyValueSet options = new KeyValueSet();
- TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
- if (storeType == StoreType.AVRO) {
- meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
- TEST_MULTIPLE_FILES_AVRO_SCHEMA);
- }
-
- Path table1Path = new Path(testDir, storeType + "_1.data");
- Appender appender1 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table1Path);
- appender1.enableStats();
- appender1.init();
- int tupleNum = 10000;
- VTuple vTuple;
-
- for(int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(4);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createText("hyunsik"));
- vTuple.put(2, DatumFactory.createText("jihoon"));
- vTuple.put(3, DatumFactory.createInt8(25l));
- appender1.addTuple(vTuple);
- }
- appender1.close();
-
- TableStats stat1 = appender1.getStats();
- if (stat1 != null) {
- assertEquals(tupleNum, stat1.getNumRows().longValue());
- }
-
- Path table2Path = new Path(testDir, storeType + "_2.data");
- Appender appender2 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table2Path);
- appender2.enableStats();
- appender2.init();
-
- for(int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(4);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createText("hyunsik"));
- vTuple.put(2, DatumFactory.createText("jihoon"));
- vTuple.put(3, DatumFactory.createInt8(25l));
- appender2.addTuple(vTuple);
- }
- appender2.close();
-
- TableStats stat2 = appender2.getStats();
- if (stat2 != null) {
- assertEquals(tupleNum, stat2.getNumRows().longValue());
- }
-
-
- FileStatus status1 = fs.getFileStatus(table1Path);
- FileStatus status2 = fs.getFileStatus(table2Path);
- FileFragment[] fragment = new FileFragment[2];
- fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen());
- fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen());
-
- Schema targetSchema = new Schema();
- targetSchema.addColumn(schema.getColumn(0));
- targetSchema.addColumn(schema.getColumn(2));
-
- Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.<FileFragment>newList(fragment), targetSchema);
- assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable());
-
- scanner.init();
- int totalCounts = 0;
- Tuple tuple;
- while ((tuple = scanner.next()) != null) {
- totalCounts++;
- if (isProjectableStorage(meta.getStoreType())) {
- assertNotNull(tuple.get(0));
- assertNull(tuple.get(1));
- assertNotNull(tuple.get(2));
- assertNull(tuple.get(3));
- }
- }
- scanner.close();
-
- assertEquals(tupleNum * 2, totalCounts);
- }
-
- private static boolean isProjectableStorage(StoreType type) {
- switch (type) {
- case RCFILE:
- case PARQUET:
- case SEQUENCEFILE:
- case CSV:
- case AVRO:
- return true;
- default:
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
deleted file mode 100644
index 12ea551..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.CharsetUtil;
-import org.apache.tajo.storage.text.FieldSplitProcessor;
-import org.apache.tajo.storage.text.LineSplitProcessor;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static io.netty.util.ReferenceCountUtil.releaseLater;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestSplitProcessor {
-
- @Test
- public void testFieldSplitProcessor() throws IOException {
- String data = "abc||de";
- final ByteBuf buf = releaseLater(
- Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
-
- final int len = buf.readableBytes();
- FieldSplitProcessor processor = new FieldSplitProcessor('|');
-
- assertEquals(3, buf.forEachByte(0, len, processor));
- assertEquals(4, buf.forEachByte(4, len - 4, processor));
- assertEquals(-1, buf.forEachByte(5, len - 5, processor));
-
- }
-
- @Test
- public void testLineSplitProcessor() throws IOException {
- String data = "abc\r\n\n";
- final ByteBuf buf = releaseLater(
- Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
-
- final int len = buf.readableBytes();
- LineSplitProcessor processor = new LineSplitProcessor();
-
- //find CR
- assertEquals(3, buf.forEachByte(0, len, processor));
-
- // find CRLF
- assertEquals(4, buf.forEachByte(4, len - 4, processor));
- assertEquals(buf.getByte(4), '\n');
- // need to skip LF
- assertTrue(processor.isPrevCharCR());
-
- // find LF
- assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is zero
- }
-}