You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/05 09:21:25 UTC
[21/29] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into hbase_storage
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
index 9722959,0000000..95d0407
mode 100644,000000..100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java
@@@ -1,223 -1,0 +1,223 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.text;
+
+import com.google.protobuf.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.CharsetUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.util.NumberUtil;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.CharsetDecoder;
+
+//Compatibility with Apache Hive
+public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer {
+ public static final byte[] trueBytes = "true".getBytes();
+ public static final byte[] falseBytes = "false".getBytes();
- private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
++ private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+ private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8);
+
+ private static boolean isNull(ByteBuf val, ByteBuf nullBytes) {
+ return !val.isReadable() || nullBytes.equals(val);
+ }
+
+ private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) {
+ return val.readableBytes() > 0 && nullBytes.equals(val);
+ }
+
+ @Override
+ public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException {
+ byte[] bytes;
+ int length = 0;
+ TajoDataTypes.DataType dataType = col.getDataType();
+
+ if (datum == null || datum instanceof NullDatum) {
+ switch (dataType.getType()) {
+ case CHAR:
+ case TEXT:
+ length = nullChars.length;
+ out.write(nullChars);
+ break;
+ default:
+ break;
+ }
+ return length;
+ }
+
+ switch (dataType.getType()) {
+ case BOOLEAN:
+ out.write(datum.asBool() ? trueBytes : falseBytes);
+ length = trueBytes.length;
+ break;
+ case CHAR:
+ byte[] pad = new byte[dataType.getLength() - datum.size()];
+ bytes = datum.asTextBytes();
+ out.write(bytes);
+ out.write(pad);
+ length = bytes.length + pad.length;
+ break;
+ case TEXT:
+ case BIT:
+ case INT2:
+ case INT4:
+ case INT8:
+ case FLOAT4:
+ case FLOAT8:
+ case INET4:
+ case DATE:
+ case INTERVAL:
+ bytes = datum.asTextBytes();
+ length = bytes.length;
+ out.write(bytes);
+ break;
+ case TIME:
+ bytes = ((TimeDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes();
+ length = bytes.length;
+ out.write(bytes);
+ break;
+ case TIMESTAMP:
+ bytes = ((TimestampDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes();
+ length = bytes.length;
+ out.write(bytes);
+ break;
+ case INET6:
+ case BLOB:
+ bytes = Base64.encodeBase64(datum.asByteArray(), false);
+ length = bytes.length;
+ out.write(bytes, 0, length);
+ break;
+ case PROTOBUF:
+ ProtobufDatum protobuf = (ProtobufDatum) datum;
+ byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
+ length = protoBytes.length;
+ out.write(protoBytes, 0, protoBytes.length);
+ break;
+ case NULL_TYPE:
+ default:
+ break;
+ }
+ return length;
+ }
+
+ @Override
+ public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException {
+ Datum datum;
+ TajoDataTypes.Type type = col.getDataType().getType();
+ boolean nullField;
+ if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) {
+ nullField = isNullText(buf, nullChars);
+ } else {
+ nullField = isNull(buf, nullChars);
+ }
+
+ if (nullField) {
+ datum = NullDatum.get();
+ } else {
+ switch (type) {
+ case BOOLEAN:
+ byte bool = buf.readByte();
+ datum = DatumFactory.createBool(bool == 't' || bool == 'T');
+ break;
+ case BIT:
+ datum = DatumFactory.createBit(Byte.parseByte(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()));
+ break;
+ case CHAR:
+ datum = DatumFactory.createChar(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim());
+ break;
+ case INT1:
+ case INT2:
+ datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf));
+ break;
+ case INT4:
+ datum = DatumFactory.createInt4(NumberUtil.parseInt(buf));
+ break;
+ case INT8:
+ datum = DatumFactory.createInt8(NumberUtil.parseLong(buf));
+ break;
+ case FLOAT4:
+ datum = DatumFactory.createFloat4(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case FLOAT8:
+ datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf));
+ break;
+ case TEXT: {
+ byte[] bytes = new byte[buf.readableBytes()];
+ buf.readBytes(bytes);
+ datum = DatumFactory.createText(bytes);
+ break;
+ }
+ case DATE:
+ datum = DatumFactory.createDate(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case TIME:
+ datum = DatumFactory.createTime(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case TIMESTAMP:
+ datum = DatumFactory.createTimestamp(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case INTERVAL:
+ datum = DatumFactory.createInterval(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case PROTOBUF: {
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
+ Message.Builder builder = factory.newBuilder();
+ try {
+ byte[] bytes = new byte[buf.readableBytes()];
+ buf.readBytes(bytes);
+ protobufJsonFormat.merge(bytes, builder);
+ datum = factory.createDatum(builder.build());
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ break;
+ }
+ case INET4:
+ datum = DatumFactory.createInet4(
+ decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString());
+ break;
+ case BLOB: {
+ byte[] bytes = new byte[buf.readableBytes()];
+ buf.readBytes(bytes);
+ datum = DatumFactory.createBlob(Base64.decodeBase64(bytes));
+ break;
+ }
+ default:
+ datum = NullDatum.get();
+ break;
+ }
+ }
+ return datum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
index 0000000,0000000..7ebfa79
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java
@@@ -1,0 -1,0 +1,60 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.text;
++
++import io.netty.buffer.ByteBuf;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.storage.Tuple;
++
++import java.io.IOException;
++
++/**
++ * Reads a text line and fills a Tuple with values
++ */
++public abstract class TextLineDeserializer {
++ protected Schema schema;
++ protected TableMeta meta;
++ protected int [] targetColumnIndexes;
++
++ public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) {
++ this.schema = schema;
++ this.meta = meta;
++ this.targetColumnIndexes = targetColumnIndexes;
++ }
++
++ /**
++ * Initialize SerDe
++ */
++ public abstract void init();
++
++ /**
++ * It fills a tuple with a read fields in a given line.
++ *
++ * @param buf Read line
++ * @param output Tuple to be filled with read fields
++ * @throws java.io.IOException
++ */
++ public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError;
++
++ /**
++ * Release external resources
++ */
++ public abstract void release();
++}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
index 0000000,0000000..f0bae5e
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java
@@@ -1,0 -1,0 +1,31 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.text;
++
++public class TextLineParsingError extends Exception {
++
++ public TextLineParsingError(Throwable t) {
++ super(t);
++ }
++
++ public TextLineParsingError(String message, Throwable t) {
++ super(t.getMessage() + ", Error line: " + message);
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
index 0000000,0000000..e81e289
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java
@@@ -1,0 -1,0 +1,65 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.text;
++
++import io.netty.buffer.ByteBuf;
++import org.apache.commons.lang.StringEscapeUtils;
++import org.apache.commons.lang.StringUtils;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.datum.NullDatum;
++import org.apache.tajo.storage.BufferPool;
++import org.apache.tajo.storage.StorageConstants;
++
++/**
++ * Pluggable Text Line SerDe class
++ */
++public abstract class TextLineSerDe {
++
++ public TextLineSerDe() {
++ }
++
++ public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes);
++
++ public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta);
++
++ public static ByteBuf getNullChars(TableMeta meta) {
++ byte[] nullCharByteArray = getNullCharsAsBytes(meta);
++
++ ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length);
++ nullChars.writeBytes(nullCharByteArray);
++
++ return nullChars;
++ }
++
++ public static byte [] getNullCharsAsBytes(TableMeta meta) {
++ byte [] nullChars;
++
++ String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL,
++ NullDatum.DEFAULT_TEXT));
++ if (StringUtils.isEmpty(nullCharacters)) {
++ nullChars = NullDatum.get().asTextBytes();
++ } else {
++ nullChars = nullCharacters.getBytes();
++ }
++
++ return nullChars;
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
index 0000000,0000000..0c2761f
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java
@@@ -1,0 -1,0 +1,45 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.text;
++
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.storage.Tuple;
++
++import java.io.IOException;
++import java.io.OutputStream;
++
++/**
++ * Write a Tuple into single text formatted line
++ */
++public abstract class TextLineSerializer {
++ protected Schema schema;
++ protected TableMeta meta;
++
++ public TextLineSerializer(Schema schema, TableMeta meta) {
++ this.schema = schema;
++ this.meta = meta;
++ }
++
++ public abstract void init();
++
++ public abstract int serialize(OutputStream out, Tuple input) throws IOException;
++
++ public abstract void release();
++}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index 088fda9,0000000..ff7fe13
mode 100644,000000..100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@@ -1,129 -1,0 +1,137 @@@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.tajo.storage;
-
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.s3.S3FileSystem;
- import org.apache.hadoop.hdfs.DFSConfigKeys;
- import org.apache.tajo.catalog.CatalogUtil;
- import org.apache.tajo.catalog.Schema;
- import org.apache.tajo.catalog.TableMeta;
- import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
- import org.apache.tajo.common.TajoDataTypes.Type;
- import org.apache.tajo.conf.TajoConf;
- import org.apache.tajo.datum.Datum;
- import org.apache.tajo.datum.DatumFactory;
- import org.apache.tajo.storage.fragment.Fragment;
- import org.apache.tajo.storage.s3.InMemoryFileSystemStore;
- import org.apache.tajo.storage.s3.SmallBlockS3FileSystem;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.junit.runners.Parameterized;
-
- import java.io.IOException;
- import java.net.URI;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.List;
-
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertTrue;
-
- @RunWith(Parameterized.class)
- public class TestFileSystems {
-
- protected byte[] data = null;
-
- private static String TEST_PATH = "target/test-data/TestFileSystem";
- private TajoConf conf = null;
- private FileStorageManager sm = null;
- private FileSystem fs = null;
- Path testDir;
-
- public TestFileSystems(FileSystem fs) throws IOException {
- conf = new TajoConf();
-
- if(fs instanceof S3FileSystem){
- conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "10");
- fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
- }
- this.fs = fs;
- sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
- testDir = getTestDir(this.fs, TEST_PATH);
- }
-
- public Path getTestDir(FileSystem fs, String dir) throws IOException {
- Path path = new Path(dir);
- if(fs.exists(path))
- fs.delete(path, true);
-
- fs.mkdirs(path);
-
- return fs.makeQualified(path);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> generateParameters() {
- return Arrays.asList(new Object[][] {
- {new SmallBlockS3FileSystem(new InMemoryFileSystemStore())},
- });
- }
-
- @Test
- public void testBlockSplit() throws IOException {
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT4);
- schema.addColumn("name", Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- Tuple[] tuples = new Tuple[4];
- for (int i = 0; i < tuples.length; i++) {
- tuples[i] = new VTuple(3);
- tuples[i]
- .put(new Datum[] { DatumFactory.createInt4(i),
- DatumFactory.createInt4(i + 32),
- DatumFactory.createText("name" + i) });
- }
-
- Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
- "table.csv");
- fs.mkdirs(path.getParent());
-
- Appender appender = sm.getAppender(meta, schema, path);
- appender.init();
- for (Tuple t : tuples) {
- appender.addTuple(t);
- }
- appender.close();
- FileStatus fileStatus = fs.getFileStatus(path);
-
- List<Fragment> splits = sm.getSplits("table", meta, schema, path);
- int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
- assertEquals(splitSize, splits.size());
-
- for (Fragment fragment : splits) {
- assertTrue(fragment.getLength() <= fileStatus.getBlockSize());
- }
- }
- }
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage;
++
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.LocalFileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.tajo.catalog.CatalogUtil;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
++import org.apache.tajo.common.TajoDataTypes.Type;
++import org.apache.tajo.conf.TajoConf;
++import org.apache.tajo.datum.Datum;
++import org.apache.tajo.datum.DatumFactory;
++import org.apache.tajo.storage.fragment.Fragment;
++import org.junit.After;
++import org.junit.Before;
++import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++
++import java.io.IOException;
++import java.net.URI;
++import java.util.Arrays;
++import java.util.Collection;
++import java.util.List;
++
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++@RunWith(Parameterized.class)
++public class TestFileSystems {
++
++ private static String TEST_PATH = "target/test-data/TestFileSystem";
++ private TajoConf conf;
++ private FileStorageManager sm;
++ private FileSystem fs;
++ private Path testDir;
++
++ public TestFileSystems(FileSystem fs) throws IOException {
++ this.fs = fs;
++ this.conf = new TajoConf(fs.getConf());
++ sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
++ testDir = getTestDir(this.fs, TEST_PATH);
++ }
++
++ public Path getTestDir(FileSystem fs, String dir) throws IOException {
++ Path path = new Path(dir);
++ if(fs.exists(path))
++ fs.delete(path, true);
++
++ fs.mkdirs(path);
++
++ return fs.makeQualified(path);
++ }
++
++ @Parameterized.Parameters
++ public static Collection<Object[]> generateParameters() throws IOException {
++ return Arrays.asList(new Object[][]{
++ {FileSystem.getLocal(new TajoConf())},
++ });
++ }
++
++ @Before
++ public void setup() throws IOException {
++ if (!(fs instanceof LocalFileSystem)) {
++ conf.set("fs.local.block.size", "10");
++ fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
++ fs.setConf(conf);
++ }
++ }
++
++ @After
++ public void tearDown() throws IOException {
++ if (!(fs instanceof LocalFileSystem)) {
++ fs.setConf(new TajoConf());
++ }
++ }
++
++ @Test
++ public void testBlockSplit() throws IOException {
++
++ Schema schema = new Schema();
++ schema.addColumn("id", Type.INT4);
++ schema.addColumn("age", Type.INT4);
++ schema.addColumn("name", Type.TEXT);
++
++ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
++
++ Tuple[] tuples = new Tuple[4];
++ for (int i = 0; i < tuples.length; i++) {
++ tuples[i] = new VTuple(3);
++ tuples[i]
++ .put(new Datum[]{DatumFactory.createInt4(i),
++ DatumFactory.createInt4(i + 32),
++ DatumFactory.createText("name" + i)});
++ }
++
++ Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
++ "table.csv");
++ fs.mkdirs(path.getParent());
++
++ Appender appender = sm.getAppender(meta, schema, path);
++ appender.init();
++ for (Tuple t : tuples) {
++ appender.addTuple(t);
++ }
++ appender.close();
++ FileStatus fileStatus = fs.getFileStatus(path);
++
++ List<Fragment> splits = sm.getSplits("table", meta, schema, path);
++ int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
++ assertEquals(splitSize, splits.size());
++
++ for (Fragment fragment : splits) {
++ assertTrue(fragment.getLength() <= fileStatus.getBlockSize());
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 4081a80,0000000..15998f2
mode 100644,000000..100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@@ -1,867 -1,0 +1,878 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestStorages {
+ private TajoConf conf;
+ private static String TEST_PATH = "target/test-data/TestStorages";
+
+ private static String TEST_PROJECTION_AVRO_SCHEMA =
+ "{\n" +
+ " \"type\": \"record\",\n" +
+ " \"namespace\": \"org.apache.tajo\",\n" +
+ " \"name\": \"testProjection\",\n" +
+ " \"fields\": [\n" +
+ " { \"name\": \"id\", \"type\": \"int\" },\n" +
+ " { \"name\": \"age\", \"type\": \"long\" },\n" +
+ " { \"name\": \"score\", \"type\": \"float\" }\n" +
+ " ]\n" +
+ "}\n";
+
+ private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA =
+ "{\n" +
+ " \"type\": \"record\",\n" +
+ " \"namespace\": \"org.apache.tajo\",\n" +
+ " \"name\": \"testNullHandlingTypes\",\n" +
+ " \"fields\": [\n" +
+ " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" +
- " { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" +
- " { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" +
++ " { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" +
++ " { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" +
+ " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" +
- " { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" +
- " { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" +
- " { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" +
- " { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" +
- " { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" +
++ " { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" +
++ " { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" +
++ " { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" +
++ " { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" +
++ " { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" +
+ " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" +
- " { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" +
- " { \"name\": \"col12\", \"type\": \"null\" },\n" +
- " { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" +
++ " { \"name\": \"col11\", \"type\": \"null\" },\n" +
++ " { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" +
+ " ]\n" +
+ "}\n";
+
+ private StoreType storeType;
+ private boolean splitable;
+ private boolean statsable;
+ private boolean seekable;
+ private Path testDir;
+ private FileSystem fs;
+
+ public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException {
+ this.storeType = type;
+ this.splitable = splitable;
+ this.statsable = statsable;
+ this.seekable = seekable;
+
+ conf = new TajoConf();
+
+ if (storeType == StoreType.RCFILE) {
+ conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
+ }
+
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ fs = testDir.getFileSystem(conf);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][] {
+ //type, splitable, statsable, seekable
+ {StoreType.CSV, true, true, true},
+ {StoreType.RAW, false, true, true},
+ {StoreType.RCFILE, true, true, false},
+ {StoreType.PARQUET, false, false, false},
+ {StoreType.SEQUENCEFILE, true, true, false},
+ {StoreType.AVRO, false, false, false},
+ {StoreType.TEXTFILE, true, true, false},
++ {StoreType.JSON, true, true, false},
+ });
+ }
+
+ @Test
+ public void testSplitable() throws IOException {
+ if (splitable) {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ Path tablePath = new Path(testDir, "Splitable.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(2);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ long randomNum = (long) (Math.random() * fileLen) + 1;
+
+ FileFragment[] tablets = new FileFragment[2];
+ tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+ tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+ Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ int tupleCnt = 0;
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ scanner = sm.getScanner(meta, schema, tablets[1], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ assertEquals(tupleNum, tupleCnt);
+ }
+ }
+
+ @Test
+ public void testRCFileSplitable() throws IOException {
+ if (storeType == StoreType.RCFILE) {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ Path tablePath = new Path(testDir, "Splitable.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(2);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ long fileLen = status.getLen();
+ long randomNum = 122; // header size
+
+ FileFragment[] tablets = new FileFragment[2];
+ tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+ tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+ Scanner scanner = sm.getScanner(meta, schema, tablets[0], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ int tupleCnt = 0;
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ scanner = sm.getScanner(meta, schema, tablets[1], schema);
+ assertTrue(scanner.isSplittable());
+ scanner.init();
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+ scanner.close();
+
+ assertEquals(tupleNum, tupleCnt);
+ }
+ }
+
+ @Test
+ public void testProjection() throws IOException {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("score", Type.FLOAT4);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ if (storeType == StoreType.AVRO) {
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+ TEST_PROJECTION_AVRO_SCHEMA);
+ }
+
+ Path tablePath = new Path(testDir, "testProjection.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+ int tupleNum = 10000;
+ VTuple vTuple;
+
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(3);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(i + 2));
+ vTuple.put(2, DatumFactory.createFloat4(i + 3));
+ appender.addTuple(vTuple);
+ }
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen());
+
+ Schema target = new Schema();
+ target.addColumn("age", Type.INT8);
+ target.addColumn("score", Type.FLOAT4);
+ Scanner scanner = sm.getScanner(meta, schema, fragment, target);
+ scanner.init();
+ int tupleCnt = 0;
+ Tuple tuple;
+ while ((tuple = scanner.next()) != null) {
+ if (storeType == StoreType.RCFILE
+ || storeType == StoreType.CSV
+ || storeType == StoreType.PARQUET
+ || storeType == StoreType.SEQUENCEFILE
+ || storeType == StoreType.AVRO) {
+ assertTrue(tuple.get(0) == null);
+ }
+ assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
+ assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());
+ tupleCnt++;
+ }
+ scanner.close();
+
+ assertEquals(tupleNum, tupleCnt);
+ }
+
+ @Test
+ public void testVariousTypes() throws IOException {
++ boolean handleProtobuf = storeType != StoreType.JSON;
++
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.BIT);
- schema.addColumn("col3", Type.CHAR, 7);
- schema.addColumn("col4", Type.INT2);
- schema.addColumn("col5", Type.INT4);
- schema.addColumn("col6", Type.INT8);
- schema.addColumn("col7", Type.FLOAT4);
- schema.addColumn("col8", Type.FLOAT8);
- schema.addColumn("col9", Type.TEXT);
- schema.addColumn("col10", Type.BLOB);
- schema.addColumn("col11", Type.INET4);
- schema.addColumn("col12", Type.NULL_TYPE);
- schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
++ schema.addColumn("col2", Type.CHAR, 7);
++ schema.addColumn("col3", Type.INT2);
++ schema.addColumn("col4", Type.INT4);
++ schema.addColumn("col5", Type.INT8);
++ schema.addColumn("col6", Type.FLOAT4);
++ schema.addColumn("col7", Type.FLOAT8);
++ schema.addColumn("col8", Type.TEXT);
++ schema.addColumn("col9", Type.BLOB);
++ schema.addColumn("col10", Type.INET4);
++ schema.addColumn("col11", Type.NULL_TYPE);
++ if (handleProtobuf) {
++ schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
++ }
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ if (storeType == StoreType.AVRO) {
- String path = FileUtil.getResourcePath("testVariousTypes.avsc").toString();
++ String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString();
+ meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
+ }
+
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
- Tuple tuple = new VTuple(13);
++ Tuple tuple = new VTuple(11 + (handleProtobuf ? 1 : 0));
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("hyunsik"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("hyunsik"),
+ DatumFactory.createBlob("hyunsik".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get(),
- factory.createDatum(queryid.getProto())
++ NullDatum.get()
+ });
++ if (handleProtobuf) {
++ tuple.put(11, factory.createDatum(queryid.getProto()));
++ }
++
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = sm.getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved = scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ }
+
+ @Test
+ public void testNullHandlingTypes() throws IOException {
++ boolean handleProtobuf = storeType != StoreType.JSON;
++
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.BIT);
- schema.addColumn("col3", Type.CHAR, 7);
- schema.addColumn("col4", Type.INT2);
- schema.addColumn("col5", Type.INT4);
- schema.addColumn("col6", Type.INT8);
- schema.addColumn("col7", Type.FLOAT4);
- schema.addColumn("col8", Type.FLOAT8);
- schema.addColumn("col9", Type.TEXT);
- schema.addColumn("col10", Type.BLOB);
- schema.addColumn("col11", Type.INET4);
- schema.addColumn("col12", Type.NULL_TYPE);
- schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
++ schema.addColumn("col2", Type.CHAR, 7);
++ schema.addColumn("col3", Type.INT2);
++ schema.addColumn("col4", Type.INT4);
++ schema.addColumn("col5", Type.INT8);
++ schema.addColumn("col6", Type.FLOAT4);
++ schema.addColumn("col7", Type.FLOAT8);
++ schema.addColumn("col8", Type.TEXT);
++ schema.addColumn("col9", Type.BLOB);
++ schema.addColumn("col10", Type.INET4);
++ schema.addColumn("col11", Type.NULL_TYPE);
++
++ if (handleProtobuf) {
++ schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
++ }
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+ meta.putOption(StorageConstants.TEXT_NULL, "\\\\N");
+ meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N");
+ meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
+ meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\");
+ if (storeType == StoreType.AVRO) {
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+ TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
+ }
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple seedTuple = new VTuple(13);
++ int columnNum = 11 + (handleProtobuf ? 1 : 0);
++ Tuple seedTuple = new VTuple(columnNum);
+ seedTuple.put(new Datum[]{
+ DatumFactory.createBool(true), // 0
- DatumFactory.createBit((byte) 0x99), // 1
+ DatumFactory.createChar("hyunsik"), // 2
+ DatumFactory.createInt2((short) 17), // 3
+ DatumFactory.createInt4(59), // 4
+ DatumFactory.createInt8(23l), // 5
+ DatumFactory.createFloat4(77.9f), // 6
+ DatumFactory.createFloat8(271.9f), // 7
+ DatumFactory.createText("hyunsik"), // 8
+ DatumFactory.createBlob("hyunsik".getBytes()),// 9
+ DatumFactory.createInet4("192.168.0.1"), // 10
+ NullDatum.get(), // 11
- factory.createDatum(queryid.getProto()) // 12
+ });
+
++ if (handleProtobuf) {
++ seedTuple.put(11, factory.createDatum(queryid.getProto())); // 12
++ }
++
+ // Making tuples with different null column positions
+ Tuple tuple;
- for (int i = 0; i < 13; i++) {
- tuple = new VTuple(13);
- for (int j = 0; j < 13; j++) {
++ for (int i = 0; i < columnNum; i++) {
++ tuple = new VTuple(columnNum);
++ for (int j = 0; j < columnNum; j++) {
+ if (i == j) { // i'th column will have NULL value
+ tuple.put(j, NullDatum.get());
+ } else {
+ tuple.put(j, seedTuple.get(j));
+ }
+ }
+ appender.addTuple(tuple);
+ }
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ int i = 0;
+ while ((retrieved = scanner.next()) != null) {
- assertEquals(13, retrieved.size());
- for (int j = 0; j < 13; j++) {
++ assertEquals(columnNum, retrieved.size());
++ for (int j = 0; j < columnNum; j++) {
+ if (i == j) {
+ assertEquals(NullDatum.get(), retrieved.get(j));
+ } else {
+ assertEquals(seedTuple.get(j), retrieved.get(j));
+ }
+ }
+
+ i++;
+ }
+ scanner.close();
+ }
+
+ @Test
+ public void testRCFileTextSerializeDeserialize() throws IOException {
+ if(storeType != StoreType.RCFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testRCFileBinarySerializeDeserialize() throws IOException {
+ if(storeType != StoreType.RCFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testSequenceFileTextSerializeDeserialize() throws IOException {
+ if(storeType != StoreType.SEQUENCEFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ assertTrue(scanner instanceof SequenceFileScanner);
+ Writable key = ((SequenceFileScanner) scanner).getKey();
+ assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testSequenceFileBinarySerializeDeserialize() throws IOException {
+ if(storeType != StoreType.SEQUENCEFILE) return;
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.BIT);
+ schema.addColumn("col3", Type.CHAR, 7);
+ schema.addColumn("col4", Type.INT2);
+ schema.addColumn("col5", Type.INT4);
+ schema.addColumn("col6", Type.INT8);
+ schema.addColumn("col7", Type.FLOAT4);
+ schema.addColumn("col8", Type.FLOAT8);
+ schema.addColumn("col9", Type.TEXT);
+ schema.addColumn("col10", Type.BLOB);
+ schema.addColumn("col11", Type.INET4);
+ schema.addColumn("col12", Type.NULL_TYPE);
+ schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+ meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
+
+ Path tablePath = new Path(testDir, "testVariousTypes.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+ Tuple tuple = new VTuple(13);
+ tuple.put(new Datum[] {
+ DatumFactory.createBool(true),
+ DatumFactory.createBit((byte) 0x99),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("hyunsik babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ factory.createDatum(queryid.getProto())
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ assertTrue(scanner instanceof SequenceFileScanner);
+ Writable key = ((SequenceFileScanner) scanner).getKey();
+ assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName());
+
+ Tuple retrieved;
+ while ((retrieved=scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+ assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+ }
+
+ @Test
+ public void testTime() throws IOException {
+ if (storeType == StoreType.CSV || storeType == StoreType.RAW) {
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.DATE);
+ schema.addColumn("col2", Type.TIME);
+ schema.addColumn("col3", Type.TIMESTAMP);
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+
+ Path tablePath = new Path(testDir, "testTime.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+ Tuple tuple = new VTuple(3);
+ tuple.put(new Datum[]{
+ DatumFactory.createDate("1980-04-01"),
+ DatumFactory.createTime("12:34:56"),
+ DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() / 1000))
+ });
+ appender.addTuple(tuple);
+ appender.flush();
+ appender.close();
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+ scanner.init();
+
+ Tuple retrieved;
+ while ((retrieved = scanner.next()) != null) {
+ for (int i = 0; i < tuple.size(); i++) {
+ assertEquals(tuple.get(i), retrieved.get(i));
+ }
+ }
+ scanner.close();
+ }
+ }
+
+ @Test
+ public void testSeekableScanner() throws IOException {
+ if (!seekable) {
+ return;
+ }
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT8);
+ schema.addColumn("comment", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(storeType);
+ Path tablePath = new Path(testDir, "Seekable.data");
+ FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+ FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath);
+ appender.enableStats();
+ appender.init();
+ int tupleNum = 100000;
+ VTuple vTuple;
+
+ List<Long> offsets = Lists.newArrayList();
+ offsets.add(0L);
+ for (int i = 0; i < tupleNum; i++) {
+ vTuple = new VTuple(3);
+ vTuple.put(0, DatumFactory.createInt4(i + 1));
+ vTuple.put(1, DatumFactory.createInt8(25l));
+ vTuple.put(2, DatumFactory.createText("test" + i));
+ appender.addTuple(vTuple);
+
+ // find a seek position
+ if (i % (tupleNum / 3) == 0) {
+ offsets.add(appender.getOffset());
+ }
+ }
+
+ // end of file
+ if (!offsets.contains(appender.getOffset())) {
+ offsets.add(appender.getOffset());
+ }
+
+ appender.close();
+ if (statsable) {
+ TableStats stat = appender.getStats();
+ assertEquals(tupleNum, stat.getNumRows().longValue());
+ }
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ assertEquals(status.getLen(), appender.getOffset());
+
+ Scanner scanner;
+ int tupleCnt = 0;
+ long prevOffset = 0;
+ long readBytes = 0;
+ long readRows = 0;
+ for (long offset : offsets) {
+ scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema,
+ new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
+ scanner.init();
+
+ while (scanner.next() != null) {
+ tupleCnt++;
+ }
+
+ scanner.close();
+ if (statsable) {
+ readBytes += scanner.getInputStats().getNumBytes();
+ readRows += scanner.getInputStats().getNumRows();
+ }
+ prevOffset = offset;
+ }
+
+ assertEquals(tupleNum, tupleCnt);
+ if (statsable) {
+ assertEquals(appender.getStats().getNumBytes().longValue(), readBytes);
+ assertEquals(appender.getStats().getNumRows().longValue(), readRows);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
index 4f7ea1c,0000000..7b83894
mode 100644,000000..100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
@@@ -1,106 -1,0 +1,106 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import org.apache.avro.Schema;
+import org.apache.tajo.HttpFileServer;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.NetUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}.
+ */
+public class TestAvroUtil {
+ private Schema expected;
+ private URL schemaUrl;
+
+ @Before
+ public void setUp() throws Exception {
- schemaUrl = FileUtil.getResourcePath("testVariousTypes.avsc");
++ schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc");
+ assertNotNull(schemaUrl);
+
+ File file = new File(schemaUrl.getPath());
+ assertTrue(file.exists());
+
+ expected = new Schema.Parser().parse(file);
+ }
+
+ @Test
+ public void testGetSchema() throws IOException, URISyntaxException {
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+ meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath())));
+ Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+ assertEquals(expected, schema);
+
+ meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+ meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath());
+ schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+ assertEquals(expected, schema);
+
+ HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
+ try {
+ server.start();
+ InetSocketAddress addr = server.getBindAddress();
+
+ String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath();
+ meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+ meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url);
+ schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+ } finally {
+ server.stop();
+ }
+ assertEquals(expected, schema);
+ }
+
+ @Test
+ public void testGetSchemaFromHttp() throws IOException, URISyntaxException {
+ HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
+ try {
+ server.start();
+ InetSocketAddress addr = server.getBindAddress();
+
+ Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath());
+ assertEquals(expected, schema);
+ } finally {
+ server.stop();
+ }
+ }
+
+ @Test
+ public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException {
+ Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf());
+
+ assertEquals(expected, schema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestLineReader.java
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestLineReader.java
index 0000000,0000000..bf7516f
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestLineReader.java
@@@ -1,0 -1,0 +1,197 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.tajo.storage.json;
++
++import io.netty.buffer.ByteBuf;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.io.IOUtils;
++import org.apache.hadoop.io.compress.DeflateCodec;
++import org.apache.tajo.catalog.CatalogUtil;
++import org.apache.tajo.catalog.Schema;
++import org.apache.tajo.catalog.TableMeta;
++import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
++import org.apache.tajo.common.TajoDataTypes.Type;
++import org.apache.tajo.conf.TajoConf;
++import org.apache.tajo.datum.DatumFactory;
++import org.apache.tajo.datum.NullDatum;
++import org.apache.tajo.storage.ByteBufInputChannel;
++import org.apache.tajo.storage.FileAppender;
++import org.apache.tajo.storage.StorageManager;
++import org.apache.tajo.storage.VTuple;
++import org.apache.tajo.storage.fragment.FileFragment;
++import org.apache.tajo.storage.text.ByteBufLineReader;
++import org.apache.tajo.storage.text.DelimitedLineReader;
++import org.apache.tajo.storage.text.DelimitedTextFile;
++import org.apache.tajo.util.CommonTestingUtil;
++import org.apache.tajo.util.FileUtil;
++import org.junit.Test;
++
++import java.io.File;
++import java.io.FileInputStream;
++import java.io.IOException;
++import java.util.concurrent.atomic.AtomicInteger;
++
++import static org.junit.Assert.*;
++
++public class TestLineReader {
++ private static String TEST_PATH = "target/test-data/TestLineReader";
++
++ @Test
++ public void testByteBufLineReader() throws IOException {
++ TajoConf conf = new TajoConf();
++ Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
++ FileSystem fs = testDir.getFileSystem(conf);
++
++ Schema schema = new Schema();
++ schema.addColumn("id", Type.INT4);
++ schema.addColumn("age", Type.INT8);
++ schema.addColumn("comment", Type.TEXT);
++ schema.addColumn("comment2", Type.TEXT);
++
++ TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
++ Path tablePath = new Path(testDir, "line.data");
++ FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
++ null, null, meta, schema, tablePath);
++ appender.enableStats();
++ appender.init();
++ int tupleNum = 10000;
++ VTuple vTuple;
++
++ for (int i = 0; i < tupleNum; i++) {
++ vTuple = new VTuple(4);
++ vTuple.put(0, DatumFactory.createInt4(i + 1));
++ vTuple.put(1, DatumFactory.createInt8(25l));
++ vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
++ vTuple.put(3, NullDatum.get());
++ appender.addTuple(vTuple);
++ }
++ appender.close();
++
++ FileStatus status = fs.getFileStatus(tablePath);
++
++ ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
++ assertEquals(status.getLen(), channel.available());
++ ByteBufLineReader reader = new ByteBufLineReader(channel);
++ assertEquals(status.getLen(), reader.available());
++
++ long totalRead = 0;
++ int i = 0;
++ AtomicInteger bytes = new AtomicInteger();
++ for(;;){
++ ByteBuf buf = reader.readLineBuf(bytes);
++ if(buf == null) break;
++
++ totalRead += bytes.get();
++ i++;
++ }
++ IOUtils.cleanup(null, reader, channel, fs);
++ assertEquals(tupleNum, i);
++ assertEquals(status.getLen(), totalRead);
++ assertEquals(status.getLen(), reader.readBytes());
++ }
++
++ @Test
++ public void testLineDelimitedReader() throws IOException {
++ TajoConf conf = new TajoConf();
++ Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
++ FileSystem fs = testDir.getFileSystem(conf);
++
++ Schema schema = new Schema();
++ schema.addColumn("id", Type.INT4);
++ schema.addColumn("age", Type.INT8);
++ schema.addColumn("comment", Type.TEXT);
++ schema.addColumn("comment2", Type.TEXT);
++
++ TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
++ meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
++
++ Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName());
++ FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
++ null, null, meta, schema, tablePath);
++ appender.enableStats();
++ appender.init();
++ int tupleNum = 10000;
++ VTuple vTuple;
++
++ long splitOffset = 0;
++ for (int i = 0; i < tupleNum; i++) {
++ vTuple = new VTuple(4);
++ vTuple.put(0, DatumFactory.createInt4(i + 1));
++ vTuple.put(1, DatumFactory.createInt8(25l));
++ vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
++ vTuple.put(3, NullDatum.get());
++ appender.addTuple(vTuple);
++
++ if(i == (tupleNum / 2)){
++ splitOffset = appender.getOffset();
++ }
++ }
++ String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
++ appender.close();
++
++ tablePath = tablePath.suffix(extension);
++ FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset);
++ DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF
++ assertTrue(reader.isCompressed());
++ assertFalse(reader.isReadable());
++ reader.init();
++ assertTrue(reader.isReadable());
++
++
++ int i = 0;
++ while(reader.isReadable()){
++ ByteBuf buf = reader.readLine();
++ if(buf == null) break;
++ i++;
++ }
++
++ IOUtils.cleanup(null, reader, fs);
++ assertEquals(tupleNum, i);
++
++ }
++
++ @Test
++ public void testByteBufLineReaderWithoutTerminating() throws IOException {
++ String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile();
++ File file = new File(path);
++ String data = FileUtil.readTextFile(file);
++
++ ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
++
++ assertEquals(file.length(), channel.available());
++ ByteBufLineReader reader = new ByteBufLineReader(channel);
++ assertEquals(file.length(), reader.available());
++
++ long totalRead = 0;
++ int i = 0;
++ AtomicInteger bytes = new AtomicInteger();
++ for(;;){
++ ByteBuf buf = reader.readLineBuf(bytes);
++ if(buf == null) break;
++ totalRead += bytes.get();
++ i++;
++ }
++ IOUtils.cleanup(null, reader);
++ assertEquals(file.length(), totalRead);
++ assertEquals(file.length(), reader.readBytes());
++ assertEquals(data.split("\n").length, i);
++ }
++}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
index 0000000,0000000..8ee3408
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestJsonSerDe/testVariousType.json
@@@ -1,0 -1,0 +1,1 @@@
++{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"}
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt
index 0000000,0000000..7403c26
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testLineText.txt
@@@ -1,0 -1,0 +1,2 @@@
++1|25|emiya muljomdao
++2|25|emiya muljomdao
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
index 0000000,0000000..d4250a9
new file mode 100644
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/testVariousTypes.avsc
@@@ -1,0 -1,0 +1,20 @@@
++{
++ "type": "record",
++ "namespace": "org.apache.tajo",
++ "name": "testVariousTypes",
++ "fields": [
++ { "name": "col1", "type": "boolean" },
++ { "name": "col2", "type": "string" },
++ { "name": "col3", "type": "int" },
++ { "name": "col4", "type": "int" },
++ { "name": "col5", "type": "long" },
++ { "name": "col6", "type": "float" },
++ { "name": "col7", "type": "double" },
++ { "name": "col8", "type": "string" },
++ { "name": "col9", "type": "bytes" },
++ { "name": "col10", "type": "bytes" },
++ { "name": "col11", "type": "null" },
++ { "name": "col12", "type": "bytes" }
++ ]
++}
++
http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --cc tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
index 6190d1a,0000000..737284b
mode 100644,000000..100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
@@@ -1,164 -1,0 +1,178 @@@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<configuration>
+ <property>
+ <name>fs.s3.impl</name>
+ <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value>
+ </property>
+
+ <!-- Storage Manager Configuration -->
+ <property>
+ <name>tajo.storage.manager.hdfs.class</name>
+ <value>org.apache.tajo.storage.FileStorageManager</value>
+ </property>
+ <property>
+ <name>tajo.storage.manager.hbase.class</name>
+ <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+ </property>
+
+ <!--- Registered Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler</name>
- <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
++ <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value>
+ </property>
+
+ <!--- Fragment Class Configurations -->
+ <property>
+ <name>tajo.storage.fragment.textfile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.csv.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
++ <name>tajo.storage.fragment.json.class</name>
++ <value>org.apache.tajo.storage.fragment.FileFragment</value>
++ </property>
++ <property>
+ <name>tajo.storage.fragment.raw.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.rcfile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.row.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.parquet.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.sequencefile.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
+ <name>tajo.storage.fragment.avro.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+
+ <!--- Scanner Handler -->
+ <property>
+ <name>tajo.storage.scanner-handler.textfile.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+ </property>
+
+ <property>
++ <name>tajo.storage.scanner-handler.json.class</name>
++ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
++ </property>
++
++ <property>
+ <name>tajo.storage.scanner-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.sequencefile.class</name>
+ <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.scanner-handler.avro.class</name>
+ <value>org.apache.tajo.storage.avro.AvroScanner</value>
+ </property>
+
+ <!--- Appender Handler -->
+ <property>
+ <name>tajo.storage.appender-handler</name>
+ <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.textfile.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.csv.class</name>
+ <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+ </property>
+
+ <property>
++ <name>tajo.storage.appender-handler.json.class</name>
++ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
++ </property>
++
++ <property>
+ <name>tajo.storage.appender-handler.raw.class</name>
+ <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rcfile.class</name>
+ <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.rowfile.class</name>
+ <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.sequencefile.class</name>
+ <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
+ </property>
+
+ <property>
+ <name>tajo.storage.appender-handler.avro.class</name>
+ <value>org.apache.tajo.storage.avro.AvroAppender</value>
+ </property>
+</configuration>