You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:33 UTC
[18/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project
structure.
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
deleted file mode 100644
index 85e5f41..0000000
--- a/tajo-storage/src/main/resources/storage-default.xml
+++ /dev/null
@@ -1,180 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<configuration>
- <property>
- <name>tajo.storage.manager.maxReadBytes</name>
- <value>8388608</value>
- <description></description>
- </property>
-
- <property>
- <name>tajo.storage.manager.concurrency.perDisk</name>
- <value>1</value>
- <description></description>
- </property>
-
- <!--- Registered Scanner Handler -->
- <property>
- <name>tajo.storage.scanner-handler</name>
- <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
- </property>
-
- <!--- Fragment Class Configurations -->
- <property>
- <name>tajo.storage.fragment.textfile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.csv.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.raw.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.rcfile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.row.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.parquet.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.sequencefile.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.avro.class</name>
- <value>org.apache.tajo.storage.fragment.FileFragment</value>
- </property>
- <property>
- <name>tajo.storage.fragment.hbase.class</name>
- <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
- </property>
-
- <!--- Scanner Handler -->
- <property>
- <name>tajo.storage.scanner-handler.textfile.class</name>
- <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.csv.class</name>
- <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.raw.class</name>
- <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.rcfile.class</name>
- <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.rowfile.class</name>
- <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.parquet.class</name>
- <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.sequencefile.class</name>
- <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.avro.class</name>
- <value>org.apache.tajo.storage.avro.AvroScanner</value>
- </property>
-
- <property>
- <name>tajo.storage.scanner-handler.hbase.class</name>
- <value>org.apache.tajo.storage.hbase.HBaseScanner</value>
- </property>
-
- <!--- Appender Handler -->
- <property>
- <name>tajo.storage.appender-handler</name>
- <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.textfile.class</name>
- <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.csv.class</name>
- <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.raw.class</name>
- <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.rcfile.class</name>
- <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.rowfile.class</name>
- <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.parquet.class</name>
- <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.sequencefile.class</name>
- <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.avro.class</name>
- <value>org.apache.tajo.storage.avro.AvroAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.hbase.class</name>
- <value>org.apache.tajo.storage.hbase.HFileAppender</value>
- </property>
-
- <property>
- <name>tajo.storage.appender-handler.hfile.class</name>
- <value>org.apache.tajo.storage.hbase.HFileAppender</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
deleted file mode 100644
index cf8a54e..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-
-public class HttpFileServer {
- private final static Log LOG = LogFactory.getLog(HttpFileServer.class);
-
- private final InetSocketAddress addr;
- private InetSocketAddress bindAddr;
- private ServerBootstrap bootstrap = null;
- private ChannelFactory factory = null;
- private ChannelGroup channelGroup = null;
-
- public HttpFileServer(final InetSocketAddress addr) {
- this.addr = addr;
- this.factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
- 2);
-
- // Configure the server.
- this.bootstrap = new ServerBootstrap(factory);
- // Set up the event pipeline factory.
- this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory());
- this.channelGroup = new DefaultChannelGroup();
- }
-
- public HttpFileServer(String bindaddr) {
- this(NetUtils.createSocketAddr(bindaddr));
- }
-
- public void start() {
- // Bind and start to accept incoming connections.
- Channel channel = bootstrap.bind(addr);
- channelGroup.add(channel);
- this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
- LOG.info("HttpFileServer starts up ("
- + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
- + ")");
- }
-
- public InetSocketAddress getBindAddress() {
- return this.bindAddr;
- }
-
- public void stop() {
- ChannelGroupFuture future = channelGroup.close();
- future.awaitUninterruptibly();
- factory.releaseExternalResources();
-
- LOG.info("HttpFileServer shutdown ("
- + this.bindAddr.getAddress().getHostAddress() + ":"
- + this.bindAddr.getPort() + ")");
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
deleted file mode 100644
index 6c77317..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.RandomAccessFile;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-/**
- * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6
- */
-public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
- }
-
- final String path = sanitizeUri(request.getUri());
- if (path == null) {
- sendError(ctx, FORBIDDEN);
- return;
- }
-
- File file = new File(path);
- if (file.isHidden() || !file.exists()) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- if (!file.isFile()) {
- sendError(ctx, FORBIDDEN);
- return;
- }
-
- RandomAccessFile raf;
- try {
- raf = new RandomAccessFile(file, "r");
- } catch (FileNotFoundException fnfe) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- long fileLength = raf.length();
-
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- setContentLength(response, fileLength);
- setContentTypeHeader(response);
-
- Channel ch = e.getChannel();
-
- // Write the initial line and the header.
- ch.write(response);
-
- // Write the content.
- ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) != null) {
- // Cannot use zero-copy with HTTPS.
- writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
- } else {
- // No encryption - use zero-copy.
- final FileRegion region =
- new DefaultFileRegion(raf.getChannel(), 0, fileLength);
- writeFuture = ch.write(region);
- writeFuture.addListener(new ChannelFutureProgressListener() {
- public void operationComplete(ChannelFuture future) {
- region.releaseExternalResources();
- }
-
- public void operationProgressed(
- ChannelFuture future, long amount, long current, long total) {
- System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
- }
- });
- }
-
- // Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
- // Close the connection when the whole content is written out.
- writeFuture.addListener(ChannelFutureListener.CLOSE);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
- if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
- return;
- }
-
- cause.printStackTrace();
- if (ch.isConnected()) {
- sendError(ctx, INTERNAL_SERVER_ERROR);
- }
- }
-
- private static String sanitizeUri(String uri) {
- // Decode the path.
- try {
- uri = URLDecoder.decode(uri, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- try {
- uri = URLDecoder.decode(uri, "ISO-8859-1");
- } catch (UnsupportedEncodingException e1) {
- throw new Error();
- }
- }
-
- // Convert file separators.
- uri = uri.replace('/', File.separatorChar);
-
- // Simplistic dumb security check.
- // You will have to do something serious in the production environment.
- if (uri.contains(File.separator + '.') ||
- uri.contains('.' + File.separator) ||
- uri.startsWith(".") || uri.endsWith(".")) {
- return null;
- }
-
- return uri;
- }
-
- private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(ChannelBuffers.copiedBuffer(
- "Failure: " + status.toString() + "\r\n",
- CharsetUtil.UTF_8));
-
- // Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
- }
-
- /**
- * Sets the content type header for the HTTP Response
- *
- * @param response
- * HTTP response
- */
- private static void setContentTypeHeader(HttpResponse response) {
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
deleted file mode 100644
index cecf93b..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-// Uncomment the following lines if you want HTTPS
-//import javax.net.ssl.SSLEngine;
-//import org.jboss.netty.example.securechat.SecureChatSslContextFactory;
-//import org.jboss.netty.handler.ssl.SslHandler;
-
-//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6
-public class HttpFileServerPipelineFactory implements ChannelPipelineFactory {
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = pipeline();
-
- // Uncomment the following lines if you want HTTPS
- //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
- //engine.setUseClientMode(false);
- //pipeline.addLast("ssl", new SslHandler(engine));
-
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
-
- pipeline.addLast("handler", new HttpFileServerHandler());
- return pipeline;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
deleted file mode 100644
index ea46fa7..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
-import org.apache.tajo.storage.text.DelimitedTextFile;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestCompressionStorages {
- private TajoConf conf;
- private static String TEST_PATH = "target/test-data/TestCompressionStorages";
-
- private StoreType storeType;
- private Path testDir;
- private FileSystem fs;
-
- public TestCompressionStorages(StoreType type) throws IOException {
- this.storeType = type;
- conf = new TajoConf();
-
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- fs = testDir.getFileSystem(conf);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> generateParameters() {
- return Arrays.asList(new Object[][]{
- {StoreType.CSV},
- {StoreType.RCFILE},
- {StoreType.SEQUENCEFILE},
- {StoreType.TEXTFILE}
- });
- }
-
- @Test
- public void testDeflateCodecCompressionData() throws IOException {
- storageCompressionTest(storeType, DeflateCodec.class);
- }
-
- @Test
- public void testGzipCodecCompressionData() throws IOException {
- if (storeType == StoreType.RCFILE) {
- if( ZlibFactory.isNativeZlibLoaded(conf)) {
- storageCompressionTest(storeType, GzipCodec.class);
- }
- } else if (storeType == StoreType.SEQUENCEFILE) {
- if( ZlibFactory.isNativeZlibLoaded(conf)) {
- storageCompressionTest(storeType, GzipCodec.class);
- }
- } else {
- storageCompressionTest(storeType, GzipCodec.class);
- }
- }
-
- @Test
- public void testSnappyCodecCompressionData() throws IOException {
- if (SnappyCodec.isNativeCodeLoaded()) {
- storageCompressionTest(storeType, SnappyCodec.class);
- }
- }
-
- @Test
- public void testLz4CodecCompressionData() throws IOException {
- if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
- storageCompressionTest(storeType, Lz4Codec.class);
- }
-
- private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException {
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.FLOAT4);
- schema.addColumn("name", Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(storeType);
- meta.putOption("compression.codec", codec.getCanonicalName());
- meta.putOption("compression.type", SequenceFile.CompressionType.BLOCK.name());
- meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
- meta.putOption("sequencefile.serde", TextSerializerDeserializer.class.getName());
-
- String fileName = "Compression_" + codec.getSimpleName();
- Path tablePath = new Path(testDir, fileName);
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
- appender.enableStats();
-
- appender.init();
-
- String extension = "";
- if (appender instanceof CSVFile.CSVAppender) {
- extension = ((CSVFile.CSVAppender) appender).getExtension();
- } else if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) {
- extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
- }
-
- int tupleNum = 100000;
- VTuple vTuple;
-
- for (int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(3);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createFloat4((float) i));
- vTuple.put(2, DatumFactory.createText(String.valueOf(i)));
- appender.addTuple(vTuple);
- }
- appender.close();
-
- TableStats stat = appender.getStats();
- assertEquals(tupleNum, stat.getNumRows().longValue());
- tablePath = tablePath.suffix(extension);
- FileStatus status = fs.getFileStatus(tablePath);
- long fileLen = status.getLen();
- FileFragment[] tablets = new FileFragment[1];
- tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
-
- Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
-
- if (StoreType.CSV == storeType) {
- if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
- assertTrue(scanner.isSplittable());
- } else {
- assertFalse(scanner.isSplittable());
- }
- }
- scanner.init();
-
- if (storeType == StoreType.SEQUENCEFILE) {
- assertTrue(scanner instanceof SequenceFileScanner);
- Writable key = ((SequenceFileScanner) scanner).getKey();
- assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
- }
-
- int tupleCnt = 0;
- Tuple tuple;
- while ((tuple = scanner.next()) != null) {
- tupleCnt++;
- }
- scanner.close();
- assertEquals(tupleNum, tupleCnt);
- assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
- assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
deleted file mode 100644
index 17a8da7..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3.S3FileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.s3.InMemoryFileSystemStore;
-import org.apache.tajo.storage.s3.SmallBlockS3FileSystem;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class TestFileSystems {
-
- protected byte[] data = null;
-
- private static String TEST_PATH = "target/test-data/TestFileSystem";
- private TajoConf conf = null;
- private FileStorageManager sm = null;
- private FileSystem fs = null;
- Path testDir;
-
- public TestFileSystems(FileSystem fs) throws IOException {
- conf = new TajoConf();
-
- if(fs instanceof S3FileSystem){
- conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "10");
- fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
- }
- this.fs = fs;
- sm = StorageManager.getFileStorageManager(conf);
- testDir = getTestDir(this.fs, TEST_PATH);
- }
-
- public Path getTestDir(FileSystem fs, String dir) throws IOException {
- Path path = new Path(dir);
- if(fs.exists(path))
- fs.delete(path, true);
-
- fs.mkdirs(path);
-
- return fs.makeQualified(path);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> generateParameters() {
- return Arrays.asList(new Object[][] {
- {new SmallBlockS3FileSystem(new InMemoryFileSystemStore())},
- });
- }
-
- @Test
- public void testBlockSplit() throws IOException {
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT4);
- schema.addColumn("name", Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- Tuple[] tuples = new Tuple[4];
- for (int i = 0; i < tuples.length; i++) {
- tuples[i] = new VTuple(3);
- tuples[i]
- .put(new Datum[] { DatumFactory.createInt4(i),
- DatumFactory.createInt4(i + 32),
- DatumFactory.createText("name" + i) });
- }
-
- Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
- "table.csv");
- fs.mkdirs(path.getParent());
-
- Appender appender = sm.getAppender(meta, schema, path);
- appender.init();
- for (Tuple t : tuples) {
- appender.addTuple(t);
- }
- appender.close();
- FileStatus fileStatus = fs.getFileStatus(path);
-
- List<Fragment> splits = sm.getSplits("table", meta, schema, path);
- int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
- assertEquals(splitSize, splits.size());
-
- for (Fragment fragment : splits) {
- assertTrue(fragment.getLength() <= fileStatus.getBlockSize());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
deleted file mode 100644
index 387fed5..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestFrameTuple {
- private Tuple tuple1;
- private Tuple tuple2;
-
- @Before
- public void setUp() throws Exception {
- tuple1 = new VTuple(11);
- tuple1.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
- DatumFactory.createChar('9'),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("hyunsik"),
- DatumFactory.createBlob("hyunsik".getBytes()),
- DatumFactory.createInet4("192.168.0.1")
- });
-
- tuple2 = new VTuple(11);
- tuple2.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
- DatumFactory.createChar('9'),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("hyunsik"),
- DatumFactory.createBlob("hyunsik".getBytes()),
- DatumFactory.createInet4("192.168.0.1")
- });
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public final void testFrameTuple() {
- Tuple frame = new FrameTuple(tuple1, tuple2);
- assertEquals(22, frame.size());
- for (int i = 0; i < 22; i++) {
- assertTrue(frame.contains(i));
- }
-
- assertEquals(DatumFactory.createInt8(23l), frame.get(5));
- assertEquals(DatumFactory.createInt8(23l), frame.get(16));
- assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
- assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
deleted file mode 100644
index c6149f7..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.util.BytesUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TestLazyTuple {
-
- Schema schema;
- byte[][] textRow;
- byte[] nullbytes;
- SerializerDeserializer serde;
-
- @Before
- public void setUp() {
- nullbytes = "\\N".getBytes();
-
- schema = new Schema();
- schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
- schema.addColumn("col2", TajoDataTypes.Type.BIT);
- schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7);
- schema.addColumn("col4", TajoDataTypes.Type.INT2);
- schema.addColumn("col5", TajoDataTypes.Type.INT4);
- schema.addColumn("col6", TajoDataTypes.Type.INT8);
- schema.addColumn("col7", TajoDataTypes.Type.FLOAT4);
- schema.addColumn("col8", TajoDataTypes.Type.FLOAT8);
- schema.addColumn("col9", TajoDataTypes.Type.TEXT);
- schema.addColumn("col10", TajoDataTypes.Type.BLOB);
- schema.addColumn("col11", TajoDataTypes.Type.INET4);
- schema.addColumn("col12", TajoDataTypes.Type.INT4);
- schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE);
-
- StringBuilder sb = new StringBuilder();
- sb.append(DatumFactory.createBool(true)).append('|');
- sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|');
- sb.append(DatumFactory.createChar("str")).append('|');
- sb.append(DatumFactory.createInt2((short) 17)).append('|');
- sb.append(DatumFactory.createInt4(59)).append('|');
- sb.append(DatumFactory.createInt8(23l)).append('|');
- sb.append(DatumFactory.createFloat4(77.9f)).append('|');
- sb.append(DatumFactory.createFloat8(271.9f)).append('|');
- sb.append(DatumFactory.createText("str2")).append('|');
- sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
- sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
- sb.append(new String(nullbytes)).append('|');
- sb.append(NullDatum.get());
- textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|');
- serde = new TextSerializerDeserializer();
- }
-
- @Test
- public void testGetDatum() {
-
- LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
- assertEquals(DatumFactory.createBool(true), t1.get(0));
- assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
- assertEquals(DatumFactory.createChar("str"), t1.get(2));
- assertEquals(DatumFactory.createInt2((short) 17), t1.get(3));
- assertEquals(DatumFactory.createInt4(59), t1.get(4));
- assertEquals(DatumFactory.createInt8(23l), t1.get(5));
- assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6));
- assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7));
- assertEquals(DatumFactory.createText("str2"), t1.get(8));
- assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9));
- assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10));
- assertEquals(NullDatum.get(), t1.get(11));
- assertEquals(NullDatum.get(), t1.get(12));
- }
-
- @Test
- public void testContain() {
- int colNum = schema.size();
-
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(3, DatumFactory.createInt4(1));
- t1.put(7, DatumFactory.createInt4(1));
-
- assertTrue(t1.contains(0));
- assertFalse(t1.contains(1));
- assertFalse(t1.contains(2));
- assertTrue(t1.contains(3));
- assertFalse(t1.contains(4));
- assertFalse(t1.contains(5));
- assertFalse(t1.contains(6));
- assertTrue(t1.contains(7));
- assertFalse(t1.contains(8));
- assertFalse(t1.contains(9));
- assertFalse(t1.contains(10));
- assertFalse(t1.contains(11));
- assertFalse(t1.contains(12));
- }
-
- @Test
- public void testPut() {
- int colNum = schema.size();
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
- t1.put(0, DatumFactory.createText("str"));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(11, DatumFactory.createFloat4(0.76f));
-
- assertTrue(t1.contains(0));
- assertTrue(t1.contains(1));
-
- assertEquals(t1.getText(0), "str");
- assertEquals(t1.get(1).asInt4(), 2);
- assertTrue(t1.get(11).asFloat4() == 0.76f);
- }
-
- @Test
- public void testEquals() {
- int colNum = schema.size();
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
- LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(3, DatumFactory.createInt4(2));
-
- t2.put(0, DatumFactory.createInt4(1));
- t2.put(1, DatumFactory.createInt4(2));
- t2.put(3, DatumFactory.createInt4(2));
-
- assertEquals(t1, t2);
-
- Tuple t3 = new VTuple(colNum);
- t3.put(0, DatumFactory.createInt4(1));
- t3.put(1, DatumFactory.createInt4(2));
- t3.put(3, DatumFactory.createInt4(2));
- assertEquals(t1, t3);
- assertEquals(t2, t3);
-
- LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1);
- assertNotSame(t1, t4);
- }
-
- @Test
- public void testHashCode() {
- int colNum = schema.size();
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
- LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(3, DatumFactory.createInt4(2));
- t1.put(4, DatumFactory.createText("str"));
-
- t2.put(0, DatumFactory.createInt4(1));
- t2.put(1, DatumFactory.createInt4(2));
- t2.put(3, DatumFactory.createInt4(2));
- t2.put(4, DatumFactory.createText("str"));
-
- assertEquals(t1.hashCode(), t2.hashCode());
-
- Tuple t3 = new VTuple(colNum);
- t3.put(0, DatumFactory.createInt4(1));
- t3.put(1, DatumFactory.createInt4(2));
- t3.put(3, DatumFactory.createInt4(2));
- t3.put(4, DatumFactory.createText("str"));
- assertEquals(t1.hashCode(), t3.hashCode());
- assertEquals(t2.hashCode(), t3.hashCode());
-
- Tuple t4 = new VTuple(5);
- t4.put(0, DatumFactory.createInt4(1));
- t4.put(1, DatumFactory.createInt4(2));
- t4.put(4, DatumFactory.createInt4(2));
-
- assertNotSame(t1.hashCode(), t4.hashCode());
- }
-
- @Test
- public void testPutTuple() {
- int colNum = schema.size();
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(2, DatumFactory.createInt4(3));
-
-
- Schema schema2 = new Schema();
- schema2.addColumn("col1", TajoDataTypes.Type.INT8);
- schema2.addColumn("col2", TajoDataTypes.Type.INT8);
-
- LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1);
- t2.put(0, DatumFactory.createInt4(4));
- t2.put(1, DatumFactory.createInt4(5));
-
- t1.put(3, t2);
-
- for (int i = 0; i < 5; i++) {
- assertEquals(i + 1, t1.get(i).asInt4());
- }
- }
-
- @Test
- public void testInvalidNumber() {
- byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|');
- Schema schema = new Schema();
- schema.addColumn("col1", TajoDataTypes.Type.INT2);
- schema.addColumn("col2", TajoDataTypes.Type.INT4);
- schema.addColumn("col3", TajoDataTypes.Type.INT8);
- schema.addColumn("col4", TajoDataTypes.Type.FLOAT4);
- schema.addColumn("col5", TajoDataTypes.Type.FLOAT8);
-
- LazyTuple tuple = new LazyTuple(schema, bytes, 0);
- assertEquals(bytes.length, tuple.size());
-
- for (int i = 0; i < tuple.size(); i++){
- assertEquals(NullDatum.get(), tuple.get(i));
- }
- }
-
- @Test
- public void testClone() throws CloneNotSupportedException {
- int colNum = schema.size();
- LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-
- t1.put(0, DatumFactory.createInt4(1));
- t1.put(1, DatumFactory.createInt4(2));
- t1.put(3, DatumFactory.createInt4(2));
- t1.put(4, DatumFactory.createText("str"));
-
- LazyTuple t2 = (LazyTuple) t1.clone();
- assertNotSame(t1, t2);
- assertEquals(t1, t2);
-
- assertSame(t1.get(4), t2.get(4));
-
- t1.clear();
- assertFalse(t1.equals(t2));
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
deleted file mode 100644
index 1a4bdba..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.text.ByteBufLineReader;
-import org.apache.tajo.storage.text.DelimitedTextFile;
-import org.apache.tajo.storage.text.DelimitedLineReader;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.*;
-
-public class TestLineReader {
- private static String TEST_PATH = "target/test-data/TestLineReader";
-
- @Test
- public void testByteBufLineReader() throws IOException {
- TajoConf conf = new TajoConf();
- Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- FileSystem fs = testDir.getFileSystem(conf);
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT8);
- schema.addColumn("comment", Type.TEXT);
- schema.addColumn("comment2", Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
- Path tablePath = new Path(testDir, "line.data");
- FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(meta, schema,
- tablePath);
- appender.enableStats();
- appender.init();
- int tupleNum = 10000;
- VTuple vTuple;
-
- for (int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(4);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createInt8(25l));
- vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
- vTuple.put(3, NullDatum.get());
- appender.addTuple(vTuple);
- }
- appender.close();
-
- FileStatus status = fs.getFileStatus(tablePath);
-
- ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
- assertEquals(status.getLen(), channel.available());
- ByteBufLineReader reader = new ByteBufLineReader(channel);
- assertEquals(status.getLen(), reader.available());
-
- long totalRead = 0;
- int i = 0;
- AtomicInteger bytes = new AtomicInteger();
- for(;;){
- ByteBuf buf = reader.readLineBuf(bytes);
- if(buf == null) break;
-
- totalRead += bytes.get();
- i++;
- }
- IOUtils.cleanup(null, reader, channel, fs);
- assertEquals(tupleNum, i);
- assertEquals(status.getLen(), totalRead);
- assertEquals(status.getLen(), reader.readBytes());
- }
-
- @Test
- public void testLineDelimitedReader() throws IOException {
- TajoConf conf = new TajoConf();
- Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- FileSystem fs = testDir.getFileSystem(conf);
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT8);
- schema.addColumn("comment", Type.TEXT);
- schema.addColumn("comment2", Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE);
- meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
-
- Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName());
- FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(meta, schema,
- tablePath);
- appender.enableStats();
- appender.init();
- int tupleNum = 10000;
- VTuple vTuple;
-
- long splitOffset = 0;
- for (int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(4);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createInt8(25l));
- vTuple.put(2, DatumFactory.createText("emiya muljomdao"));
- vTuple.put(3, NullDatum.get());
- appender.addTuple(vTuple);
-
- if(i == (tupleNum / 2)){
- splitOffset = appender.getOffset();
- }
- }
- String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
- appender.close();
-
- tablePath = tablePath.suffix(extension);
- FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset);
- DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF
- assertTrue(reader.isCompressed());
- assertFalse(reader.isReadable());
- reader.init();
- assertTrue(reader.isReadable());
-
-
- int i = 0;
- while(reader.isReadable()){
- ByteBuf buf = reader.readLine();
- if(buf == null) break;
- i++;
- }
-
- IOUtils.cleanup(null, reader, fs);
- assertEquals(tupleNum, i);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
deleted file mode 100644
index cc4aa51..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.TUtil;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestMergeScanner {
- private TajoConf conf;
- StorageManager sm;
- private static String TEST_PATH = "target/test-data/TestMergeScanner";
-
- private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA =
- "{\n" +
- " \"type\": \"record\",\n" +
- " \"namespace\": \"org.apache.tajo\",\n" +
- " \"name\": \"testMultipleFiles\",\n" +
- " \"fields\": [\n" +
- " { \"name\": \"id\", \"type\": \"int\" },\n" +
- " { \"name\": \"file\", \"type\": \"string\" },\n" +
- " { \"name\": \"name\", \"type\": \"string\" },\n" +
- " { \"name\": \"age\", \"type\": \"long\" }\n" +
- " ]\n" +
- "}\n";
-
- private Path testDir;
- private StoreType storeType;
- private FileSystem fs;
-
- public TestMergeScanner(StoreType storeType) {
- this.storeType = storeType;
- }
-
- @Parameters
- public static Collection<Object[]> generateParameters() {
- return Arrays.asList(new Object[][] {
- {StoreType.CSV},
- {StoreType.RAW},
- {StoreType.RCFILE},
- {StoreType.PARQUET},
- {StoreType.SEQUENCEFILE},
- {StoreType.AVRO},
- // RowFile requires Byte-buffer read support, so we omitted RowFile.
- //{StoreType.ROWFILE},
- });
- }
-
- @Before
- public void setup() throws Exception {
- conf = new TajoConf();
- conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
- conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- fs = testDir.getFileSystem(conf);
- sm = StorageManager.getFileStorageManager(conf, testDir);
- }
-
- @Test
- public void testMultipleFiles() throws IOException {
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("file", Type.TEXT);
- schema.addColumn("name", Type.TEXT);
- schema.addColumn("age", Type.INT8);
-
- KeyValueSet options = new KeyValueSet();
- TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
- meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
- if (storeType == StoreType.AVRO) {
- meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
- TEST_MULTIPLE_FILES_AVRO_SCHEMA);
- }
-
- Path table1Path = new Path(testDir, storeType + "_1.data");
- Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, table1Path);
- appender1.enableStats();
- appender1.init();
- int tupleNum = 10000;
- VTuple vTuple;
-
- for(int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(4);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createText("hyunsik"));
- vTuple.put(2, DatumFactory.createText("jihoon"));
- vTuple.put(3, DatumFactory.createInt8(25l));
- appender1.addTuple(vTuple);
- }
- appender1.close();
-
- TableStats stat1 = appender1.getStats();
- if (stat1 != null) {
- assertEquals(tupleNum, stat1.getNumRows().longValue());
- }
-
- Path table2Path = new Path(testDir, storeType + "_2.data");
- Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, table2Path);
- appender2.enableStats();
- appender2.init();
-
- for(int i = 0; i < tupleNum; i++) {
- vTuple = new VTuple(4);
- vTuple.put(0, DatumFactory.createInt4(i + 1));
- vTuple.put(1, DatumFactory.createText("hyunsik"));
- vTuple.put(2, DatumFactory.createText("jihoon"));
- vTuple.put(3, DatumFactory.createInt8(25l));
- appender2.addTuple(vTuple);
- }
- appender2.close();
-
- TableStats stat2 = appender2.getStats();
- if (stat2 != null) {
- assertEquals(tupleNum, stat2.getNumRows().longValue());
- }
-
-
- FileStatus status1 = fs.getFileStatus(table1Path);
- FileStatus status2 = fs.getFileStatus(table2Path);
- Fragment[] fragment = new Fragment[2];
- fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen());
- fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen());
-
- Schema targetSchema = new Schema();
- targetSchema.addColumn(schema.getColumn(0));
- targetSchema.addColumn(schema.getColumn(2));
-
- Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.newList(fragment), targetSchema);
- assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable());
-
- scanner.init();
- int totalCounts = 0;
- Tuple tuple;
- while ((tuple = scanner.next()) != null) {
- totalCounts++;
- if (isProjectableStorage(meta.getStoreType())) {
- assertNotNull(tuple.get(0));
- assertNull(tuple.get(1));
- assertNotNull(tuple.get(2));
- assertNull(tuple.get(3));
- }
- }
- scanner.close();
-
- assertEquals(tupleNum * 2, totalCounts);
- }
-
- private static boolean isProjectableStorage(StoreType type) {
- switch (type) {
- case RCFILE:
- case PARQUET:
- case SEQUENCEFILE:
- case CSV:
- case AVRO:
- return true;
- default:
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
deleted file mode 100644
index 12ea551..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.CharsetUtil;
-import org.apache.tajo.storage.text.FieldSplitProcessor;
-import org.apache.tajo.storage.text.LineSplitProcessor;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static io.netty.util.ReferenceCountUtil.releaseLater;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestSplitProcessor {
-
- @Test
- public void testFieldSplitProcessor() throws IOException {
- String data = "abc||de";
- final ByteBuf buf = releaseLater(
- Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
-
- final int len = buf.readableBytes();
- FieldSplitProcessor processor = new FieldSplitProcessor('|');
-
- assertEquals(3, buf.forEachByte(0, len, processor));
- assertEquals(4, buf.forEachByte(4, len - 4, processor));
- assertEquals(-1, buf.forEachByte(5, len - 5, processor));
-
- }
-
- @Test
- public void testLineSplitProcessor() throws IOException {
- String data = "abc\r\n\n";
- final ByteBuf buf = releaseLater(
- Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1));
-
- final int len = buf.readableBytes();
- LineSplitProcessor processor = new LineSplitProcessor();
-
- //find CR
- assertEquals(3, buf.forEachByte(0, len, processor));
-
- // find CRLF
- assertEquals(4, buf.forEachByte(4, len - 4, processor));
- assertEquals(buf.getByte(4), '\n');
- // need to skip LF
- assertTrue(processor.isPrevCharCR());
-
- // find LF
- assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is zero
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
deleted file mode 100644
index 13aeef6..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.*;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-public class TestStorageManager {
- private TajoConf conf;
- private static String TEST_PATH = "target/test-data/TestStorageManager";
- StorageManager sm = null;
- private Path testDir;
- private FileSystem fs;
-
- @Before
- public void setUp() throws Exception {
- conf = new TajoConf();
- testDir = CommonTestingUtil.getTestDir(TEST_PATH);
- fs = testDir.getFileSystem(conf);
- sm = StorageManager.getFileStorageManager(conf, testDir);
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public final void testGetScannerAndAppender() throws IOException {
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age",Type.INT4);
- schema.addColumn("name",Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- Tuple[] tuples = new Tuple[4];
- for(int i=0; i < tuples.length; i++) {
- tuples[i] = new VTuple(3);
- tuples[i].put(new Datum[] {
- DatumFactory.createInt4(i),
- DatumFactory.createInt4(i + 32),
- DatumFactory.createText("name" + i)});
- }
-
- Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
- fs.mkdirs(path.getParent());
- Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, path);
- appender.init();
- for(Tuple t : tuples) {
- appender.addTuple(t);
- }
- appender.close();
-
- Scanner scanner = StorageManager.getFileStorageManager(conf).getFileScanner(meta, schema, path);
- scanner.init();
- int i=0;
- while(scanner.next() != null) {
- i++;
- }
- assertEquals(4,i);
- }
-
- @Test
- public void testGetSplit() throws Exception {
- final Configuration conf = new HdfsConfiguration();
- String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
- conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
-
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(1).build();
-
- int testCount = 10;
- Path tablePath = new Path("/testGetSplit");
- try {
- DistributedFileSystem fs = cluster.getFileSystem();
-
- // Create test partitions
- List<Path> partitions = Lists.newArrayList();
- for (int i =0; i < testCount; i++){
- Path tmpFile = new Path(tablePath, String.valueOf(i));
- DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl);
- partitions.add(tmpFile);
- }
-
- assertTrue(fs.exists(tablePath));
- FileStorageManager sm = StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age",Type.INT4);
- schema.addColumn("name",Type.TEXT);
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- List<Fragment> splits = Lists.newArrayList();
- // Get FileFragments in partition batch
- splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
- assertEquals(testCount, splits.size());
- // -1 is unknown volumeId
- assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
-
- splits.clear();
- splits.addAll(sm.getSplits("data", meta, schema,
- partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
- assertEquals(testCount / 2, splits.size());
- assertEquals(1, splits.get(0).getHosts().length);
- assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
- fs.close();
- } finally {
- cluster.shutdown();
-
- File dir = new File(testDataPath);
- dir.delete();
- }
- }
-
- @Test
- public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
- final Configuration conf = new HdfsConfiguration();
- String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
- conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
- conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
-
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(2).build();
-
- int testCount = 10;
- Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
- try {
- DistributedFileSystem fs = cluster.getFileSystem();
-
- // Create test files
- for (int i = 0; i < testCount; i++) {
- Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat");
- DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
- }
- assertTrue(fs.exists(tablePath));
- FileStorageManager sm = StorageManager.getFileStorageManager(new TajoConf(conf), tablePath);
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT4);
- schema.addColumn("name", Type.TEXT);
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- List<Fragment> splits = Lists.newArrayList();
- splits.addAll(sm.getSplits("data", meta, schema, tablePath));
-
- assertEquals(testCount, splits.size());
- assertEquals(2, splits.get(0).getHosts().length);
- assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length);
- assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
- fs.close();
- } finally {
- cluster.shutdown();
-
- File dir = new File(testDataPath);
- dir.delete();
- }
- }
-}