You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/11 05:30:00 UTC
tajo git commit: TAJO-1295: Remove legacy worker.dataserver package
and its unit tests.
Repository: tajo
Updated Branches:
refs/heads/master c45d0ef02 -> bc478ba83
TAJO-1295: Remove legacy worker.dataserver package and its unit tests.
Closes #345
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/bc478ba8
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/bc478ba8
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/bc478ba8
Branch: refs/heads/master
Commit: bc478ba834e9bba768155faa53f918e495a74671
Parents: c45d0ef
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sun Jan 11 02:04:29 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sun Jan 11 02:04:29 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../apache/tajo/worker/InterDataRetriever.java | 113 ------
.../tajo/worker/PartitionRetrieverHandler.java | 44 ---
.../tajo/worker/RangeRetrieverHandler.java | 163 --------
.../FileAccessForbiddenException.java | 40 --
.../tajo/worker/dataserver/HttpDataServer.java | 87 -----
.../dataserver/HttpDataServerHandler.java | 199 ----------
.../HttpDataServerPipelineFactory.java | 55 ---
.../apache/tajo/worker/dataserver/HttpUtil.java | 69 ----
.../retriever/AdvancedDataRetriever.java | 128 -------
.../dataserver/retriever/DataRetriever.java | 29 --
.../retriever/DirectoryRetriever.java | 56 ---
.../worker/dataserver/retriever/FileChunk.java | 51 ---
.../dataserver/retriever/RetrieverHandler.java | 33 --
.../planner/physical/TestPhysicalPlanner.java | 103 +----
.../tajo/worker/TestRangeRetrieverHandler.java | 381 -------------------
.../worker/dataserver/TestHttpDataServer.java | 172 ---------
.../tajo/worker/dataserver/TestHttpUtil.java | 49 ---
18 files changed, 4 insertions(+), 1771 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 5f427a9..5ff3143 100644
--- a/CHANGES
+++ b/CHANGES
@@ -284,6 +284,9 @@ Release 0.9.1 - unreleased
TASKS
+ TAJO-1295: Remove legacy worker.dataserver package and its unit tests.
+ (hyunsik)
+
TAJO-1296: Remove obsolete classes from tajo.master.container package.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
deleted file mode 100644
index 5b2ad0f..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/InterDataRetriever.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.worker;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
-import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-@Deprecated
-public class InterDataRetriever implements DataRetriever {
- private final Log LOG = LogFactory.getLog(InterDataRetriever.class);
- private final Set<TaskId> registered = Sets.newHashSet();
- private final Map<String, String> map = Maps.newConcurrentMap();
-
- public InterDataRetriever() {
- }
-
- public void register(TaskId id, String baseURI) {
- synchronized (registered) {
- if (!registered.contains(id)) {
- map.put(id.toString(), baseURI);
- registered.add(id);
- }
- }
- }
-
- public void unregister(TaskId id) {
- synchronized (registered) {
- if (registered.contains(id)) {
- map.remove(id.toString());
- registered.remove(id);
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
- */
- @Override
- public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
- throws IOException {
-
- int start = request.getUri().indexOf('?');
- if (start < 0) {
- throw new IllegalArgumentException("Wrong request: " + request.getUri());
- }
-
- String queryStr = request.getUri().substring(start + 1);
- LOG.info("QUERY: " + queryStr);
- String [] queries = queryStr.split("&");
-
- String qid = null;
- String fn = null;
- String [] kv;
- for (String query : queries) {
- kv = query.split("=");
- if (kv[0].equals("qid")) {
- qid = kv[1];
- } else if (kv[0].equals("fn")) {
- fn = kv[1];
- }
- }
-
- String baseDir = map.get(qid);
- if (baseDir == null) {
- throw new FileNotFoundException("No such qid: " + qid);
- }
-
- File file = new File(baseDir + "/" + fn);
- if (file.isHidden() || !file.exists()) {
- throw new FileNotFoundException("No such file: " + baseDir + "/"
- + file.getName());
- }
- if (!file.isFile()) {
- throw new FileAccessForbiddenException("No such file: "
- + baseDir + "/" + file.getName());
- }
-
- return new FileChunk[] {new FileChunk(file, 0, file.length())};
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
deleted file mode 100644
index 36e7353..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/PartitionRetrieverHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
-import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public class PartitionRetrieverHandler implements RetrieverHandler {
- private final String baseDir;
-
- public PartitionRetrieverHandler(String baseDir) {
- this.baseDir = baseDir;
- }
-
- @Override
- public FileChunk get(Map<String, List<String>> kvs) throws IOException {
- // nothing to verify the file because AdvancedDataRetriever checks
- // its validity of the file.
- File file = new File(baseDir + "/" + kvs.get("fn").get(0));
-
- return new FileChunk(file, 0, file.length());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
deleted file mode 100644
index 2b58196..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.BaseTupleComparator;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
-import org.apache.tajo.worker.dataserver.retriever.RetrieverHandler;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- * It retrieves the file chunk ranged between start and end keys.
- * The start key is inclusive, but the end key is exclusive.
- *
- * Internally, there are four cases:
- * <ul>
- * <li>out of scope: the index range does not overlapped with the query range.</li>
- * <li>overlapped: the index range is partially overlapped with the query range. </li>
- * <li>included: the index range is included in the start and end keys</li>
- * <li>covered: the index range covers the query range (i.e., start and end keys).</li>
- * </ul>
- */
-public class RangeRetrieverHandler implements RetrieverHandler {
- private static final Log LOG = LogFactory.getLog(RangeRetrieverHandler.class);
- private final File file;
- private final BSTIndex.BSTIndexReader idxReader;
- private final Schema schema;
- private final BaseTupleComparator comp;
- private final RowStoreDecoder decoder;
-
- public RangeRetrieverHandler(File outDir, Schema schema, BaseTupleComparator comp) throws IOException {
- this.file = outDir;
- BSTIndex index = new BSTIndex(new TajoConf());
- this.schema = schema;
- this.comp = comp;
- FileSystem fs = FileSystem.getLocal(new Configuration());
- Path indexPath = fs.makeQualified(new Path(outDir.getCanonicalPath(), "index"));
- this.idxReader =
- index.getIndexReader(indexPath, this.schema, this.comp);
- this.idxReader.open();
- LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
- + idxReader.getLastKey());
- this.decoder = RowStoreUtil.createDecoder(schema);
- }
-
- @Override
- public FileChunk get(Map<String, List<String>> kvs) throws IOException {
- // nothing to verify the file because AdvancedDataRetriever checks
- // its validity of the file.
- File data = new File(this.file, "data/data");
- byte [] startBytes = Base64.decodeBase64(kvs.get("start").get(0));
- Tuple start = decoder.toTuple(startBytes);
- byte [] endBytes;
- Tuple end;
- endBytes = Base64.decodeBase64(kvs.get("end").get(0));
- end = decoder.toTuple(endBytes);
- boolean last = kvs.containsKey("final");
-
- if(!comp.isAscendingFirstKey()) {
- Tuple tmpKey = start;
- start = end;
- end = tmpKey;
- }
-
- LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
- (last ? ", last=true" : "") + ")");
-
- if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
- LOG.info("There is no contents");
- return null;
- }
-
- if (comp.compare(end, idxReader.getFirstKey()) < 0 ||
- comp.compare(idxReader.getLastKey(), start) < 0) {
- LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
- "], but request start:" + start + ", end: " + end);
- return null;
- }
-
- long startOffset;
- long endOffset;
- try {
- startOffset = idxReader.find(start);
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- try {
- endOffset = idxReader.find(end);
- if (endOffset == -1) {
- endOffset = idxReader.find(end, true);
- }
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
-
- // if startOffset == -1 then case 2-1 or case 3
- if (startOffset == -1) { // this is a hack
- // if case 2-1 or case 3
- try {
- startOffset = idxReader.find(start, true);
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- }
-
- if (startOffset == -1) {
- throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
- "State Dump (the requested range: "
- + "[" + start + ", " + end+")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- }
-
- // if greater than indexed values
- if (last || (endOffset == -1 && comp.compare(idxReader.getLastKey(), end) < 0)) {
- endOffset = data.length();
- }
-
- FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
- LOG.info("Retrieve File Chunk: " + chunk);
- return chunk;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
deleted file mode 100644
index 6c93e4f..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import java.io.IOException;
-
-public class FileAccessForbiddenException extends IOException {
- private static final long serialVersionUID = -3383272565826389213L;
-
- public FileAccessForbiddenException() {
- }
-
- public FileAccessForbiddenException(String message) {
- super(message);
- }
-
- public FileAccessForbiddenException(Throwable cause) {
- super(cause);
- }
-
- public FileAccessForbiddenException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
deleted file mode 100644
index 523d6e1..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-
-public class HttpDataServer {
- private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
-
- private final InetSocketAddress addr;
- private InetSocketAddress bindAddr;
- private ServerBootstrap bootstrap = null;
- private ChannelFactory factory = null;
- private ChannelGroup channelGroup = null;
-
- public HttpDataServer(final InetSocketAddress addr,
- final DataRetriever retriever) {
- this.addr = addr;
- this.factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
- Runtime.getRuntime().availableProcessors() * 2);
-
- // Configure the server.
- this.bootstrap = new ServerBootstrap(factory);
- // Set up the event pipeline factory.
- this.bootstrap.setPipelineFactory(
- new HttpDataServerPipelineFactory(retriever));
- this.channelGroup = new DefaultChannelGroup();
- }
-
- public HttpDataServer(String bindaddr, DataRetriever retriever) {
- this(NetUtils.createSocketAddr(bindaddr), retriever);
- }
-
- public void start() {
- // Bind and start to accept incoming connections.
- Channel channel = bootstrap.bind(addr);
- channelGroup.add(channel);
- this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
- LOG.info("HttpDataServer starts up ("
- + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
- + ")");
- }
-
- public InetSocketAddress getBindAddress() {
- return this.bindAddr;
- }
-
- public void stop() {
- ChannelGroupFuture future = channelGroup.close();
- future.awaitUninterruptibly();
- factory.releaseExternalResources();
-
- LOG.info("HttpDataServer shutdown ("
- + this.bindAddr.getAddress().getHostAddress() + ":"
- + this.bindAddr.getPort() + ")");
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
deleted file mode 100644
index 6b9eea8..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
-import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
-
-import java.io.*;
-import java.net.URLDecoder;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
- private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
- private final DataRetriever retriever;
-
- public HttpDataServerHandler(DataRetriever retriever) {
- this.retriever = retriever;
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
- }
-
- FileChunk [] file;
- try {
- file = retriever.handle(ctx, request);
- } catch (FileNotFoundException fnf) {
- LOG.error(fnf);
- sendError(ctx, NOT_FOUND);
- return;
- } catch (IllegalArgumentException iae) {
- LOG.error(iae);
- sendError(ctx, BAD_REQUEST);
- return;
- } catch (FileAccessForbiddenException fafe) {
- LOG.error(fafe);
- sendError(ctx, FORBIDDEN);
- return;
- } catch (IOException ioe) {
- LOG.error(ioe);
- sendError(ctx, INTERNAL_SERVER_ERROR);
- return;
- }
-
- // Write the content.
- Channel ch = e.getChannel();
- if (file == null) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
- ch.write(response);
- if (!isKeepAlive(request)) {
- ch.close();
- }
- } else {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- long totalSize = 0;
- for (FileChunk chunk : file) {
- totalSize += chunk.length();
- }
- setContentLength(response, totalSize);
-
- // Write the initial line and the header.
- ch.write(response);
-
- ChannelFuture writeFuture = null;
-
- for (FileChunk chunk : file) {
- writeFuture = sendFile(ctx, ch, chunk);
- if (writeFuture == null) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- }
-
- // Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
- // Close the connection when the whole content is written out.
- writeFuture.addListener(ChannelFutureListener.CLOSE);
- }
- }
- }
-
- private ChannelFuture sendFile(ChannelHandlerContext ctx, Channel ch, FileChunk file) throws IOException {
- RandomAccessFile raf;
- try {
- raf = new RandomAccessFile(file.getFile(), "r");
- } catch (FileNotFoundException fnfe) {
- return null;
- }
-
- ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) != null) {
- // Cannot use zero-copy with HTTPS.
- writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), file.length(), 8192));
- } else {
- // No encryption - use zero-copy.
- final FileRegion region = new DefaultFileRegion(raf.getChannel(), file.startOffset(), file.length());
- writeFuture = ch.write(region);
- writeFuture.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) {
- region.releaseExternalResources();
- }
- });
- }
-
- return writeFuture;
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
- if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
- return;
- }
-
- cause.printStackTrace();
- if (ch.isConnected()) {
- sendError(ctx, INTERNAL_SERVER_ERROR);
- }
- }
-
- public static String sanitizeUri(String uri) {
- // Decode the path.
- try {
- uri = URLDecoder.decode(uri, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- try {
- uri = URLDecoder.decode(uri, "ISO-8859-1");
- } catch (UnsupportedEncodingException e1) {
- throw new Error();
- }
- }
-
- // Convert file separators.
- uri = uri.replace('/', File.separatorChar);
-
- // Simplistic dumb security check.
- // You will have to do something serious in the production environment.
- if (uri.contains(File.separator + ".")
- || uri.contains("." + File.separator) || uri.startsWith(".")
- || uri.endsWith(".")) {
- return null;
- }
-
- // Convert to absolute path.
- return uri;
- }
-
- private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(ChannelBuffers.copiedBuffer(
- "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
-
- // Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
deleted file mode 100644
index 0a47f6b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
- private final DataRetriever ret;
-
- public HttpDataServerPipelineFactory(DataRetriever ret) {
- this.ret = ret;
- }
-
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = pipeline();
-
- // Uncomment the following line if you want HTTPS
- // SSLEngine engine =
- // SecureChatSslContextFactory.getServerContext().createSSLEngine();
- // engine.setUseClientMode(false);
- // pipeline.addLast("ssl", new SslHandler(engine));
-
- pipeline.addLast("decoder", new HttpRequestDecoder());
- //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
- //pipeline.addLast("deflater", new HttpContentCompressor());
- pipeline.addLast("handler", new HttpDataServerHandler(ret));
- return pipeline;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
deleted file mode 100644
index e688c39..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import com.google.common.collect.Maps;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.util.Map;
-
-public class HttpUtil {
- public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException {
- return getParamsFromQuery(uri.getQuery());
- }
-
- /**
- * It parses a query string into key/value pairs
- *
- * @param queryString decoded query string
- * @return key/value pairs parsed from a given query string
- * @throws UnsupportedEncodingException
- */
- public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException {
- String [] queries = queryString.split("&");
-
- Map<String,String> params = Maps.newHashMap();
- String [] param;
- for (String q : queries) {
- param = q.split("=");
- params.put(param[0], param[1]);
- }
-
- return params;
- }
-
- public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException {
- StringBuilder sb = new StringBuilder();
-
- boolean first = true;
- for (Map.Entry<String,String> param : params.entrySet()) {
- if (!first) {
- sb.append("&");
- }
- sb.append(URLEncoder.encode(param.getKey(), "UTF-8")).
- append("=").
- append(URLEncoder.encode(param.getValue(), "UTF-8"));
- first = false;
- }
-
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
deleted file mode 100644
index 9c15d0c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver.retriever;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.util.TajoIdUtils;
-import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class AdvancedDataRetriever implements DataRetriever {
- private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
- private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
-
- public AdvancedDataRetriever() {
- }
-
- public void register(TaskAttemptId id, RetrieverHandler handler) {
- synchronized (handlerMap) {
- if (!handlerMap.containsKey(id.toString())) {
- handlerMap.put(id.toString(), handler);
- }
- }
- }
-
- public void unregister(TaskAttemptId id) {
- synchronized (handlerMap) {
- if (handlerMap.containsKey(id.toString())) {
- handlerMap.remove(id.toString());
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
- */
- @Override
- public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
- throws IOException {
-
- final Map<String, List<String>> params =
- new QueryStringDecoder(request.getUri()).getParameters();
-
- if (!params.containsKey("qid")) {
- throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
- }
-
- if (params.containsKey("sid")) {
- List<FileChunk> chunks = Lists.newArrayList();
- List<String> qids = splitMaps(params.get("qid"));
- for (String qid : qids) {
- String[] ids = qid.split("_");
- ExecutionBlockId suid = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
- TaskId quid = new TaskId(suid, Integer.parseInt(ids[0]));
- TaskAttemptId attemptId = new TaskAttemptId(quid,
- Integer.parseInt(ids[1]));
- RetrieverHandler handler = handlerMap.get(attemptId.toString());
- FileChunk chunk = handler.get(params);
- chunks.add(chunk);
- }
- return chunks.toArray(new FileChunk[chunks.size()]);
- } else {
- RetrieverHandler handler = handlerMap.get(params.get("qid").get(0));
- FileChunk chunk = handler.get(params);
- if (chunk == null) {
- if (params.containsKey("qid")) { // if there is no content corresponding to the query
- return null;
- } else { // if there is no
- throw new FileNotFoundException("No such a file corresponding to " + params.get("qid"));
- }
- }
-
- File file = chunk.getFile();
- if (file.isHidden() || !file.exists()) {
- throw new FileNotFoundException("No such file: " + file.getAbsolutePath());
- }
- if (!file.isFile()) {
- throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file");
- }
-
- return new FileChunk[] {chunk};
- }
- }
-
- private List<String> splitMaps(List<String> qids) {
- if (null == qids) {
- LOG.error("QueryId is EMPTY");
- return null;
- }
-
- final List<String> ret = new ArrayList<String>();
- for (String qid : qids) {
- Collections.addAll(ret, qid.split(","));
- }
- return ret;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
deleted file mode 100644
index b26ba74..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-
-import java.io.IOException;
-
-public interface DataRetriever {
- FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
- throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
deleted file mode 100644
index 62dabbd..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
-import org.apache.tajo.worker.dataserver.HttpDataServerHandler;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-public class DirectoryRetriever implements DataRetriever {
- public String baseDir;
-
- public DirectoryRetriever(String baseDir) {
- this.baseDir = baseDir;
- }
-
- @Override
- public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
- throws IOException {
- final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
- if (path == null) {
- throw new IllegalArgumentException("Wrong path: " +path);
- }
-
- File file = new File(baseDir, path);
- if (file.isHidden() || !file.exists()) {
- throw new FileNotFoundException("No such file: " + baseDir + "/" + path);
- }
- if (!file.isFile()) {
- throw new FileAccessForbiddenException("No such file: "
- + baseDir + "/" + path);
- }
-
- return new FileChunk[] {new FileChunk(file, 0, file.length())};
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
deleted file mode 100644
index 4f11168..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver.retriever;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-
-public class FileChunk {
- private final File file;
- private final long startOffset;
- private final long length;
-
- public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
- this.file = file;
- this.startOffset = startOffset;
- this.length = length;
- }
-
- public File getFile() {
- return this.file;
- }
-
- public long startOffset() {
- return this.startOffset;
- }
-
- public long length() {
- return this.length;
- }
-
- public String toString() {
- return " (start=" + startOffset() + ", length=" + length + ") "
- + file.getAbsolutePath();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
deleted file mode 100644
index e5479cc..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver.retriever;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public interface RetrieverHandler {
- /**
- *
- * @param kvs url-decoded key/value pairs
- * @return a desired part of a file
- * @throws IOException
- */
- public FileChunk get(Map<String, List<String>> kvs) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index d3ab1fd..6c606b1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -19,9 +19,7 @@
package org.apache.tajo.engine.planner.physical;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,7 +42,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.session.Session;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
@@ -53,27 +50,22 @@ import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.session.Session;
import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.RangeRetrieverHandler;
import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
@@ -1044,99 +1036,6 @@ public class TestPhysicalPlanner {
};
@Test
- public final void testIndexedStoreExec() throws IOException, PlanningException {
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
- new Path(employee.getPath()), Integer.MAX_VALUE);
-
- Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec");
- TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
- LocalTajoTestingUtility.newTaskAttemptId(masterPlan),
- new FileFragment[] {frags[0]}, workDir);
- ctx.setEnforcer(new Enforcer());
- Expr context = analyzer.parse(SORT_QUERY[0]);
- LogicalPlan plan = planner.createPlan(defaultContext, context);
- LogicalNode rootNode = optimizer.optimize(plan);
-
- SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
- DataChannel channel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
- ShuffleType.RANGE_SHUFFLE);
- channel.setShuffleKeys(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray());
- ctx.setDataChannel(channel);
-
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
- Tuple tuple;
- exec.init();
- exec.next();
- exec.close();
-
- Schema keySchema = new Schema();
- keySchema.addColumn("?empId", Type.INT4);
- SortSpec[] sortSpec = new SortSpec[1];
- sortSpec[0] = new SortSpec(keySchema.getColumn(0), true, false);
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpec);
- BSTIndex bst = new BSTIndex(conf);
- BSTIndex.BSTIndexReader reader = bst.getIndexReader(new Path(workDir, "output/index"),
- keySchema, comp);
- reader.open();
- Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
- TableMeta meta = CatalogUtil.newTableMeta(channel.getStoreType(), new KeyValueSet());
- SeekableScanner scanner =
- FileStorageManager.getSeekableScanner(conf, meta, exec.getSchema(), outputPath);
- scanner.init();
-
- int cnt = 0;
-
- while(scanner.next() != null) {
- cnt++;
- }
- scanner.reset();
-
- assertEquals(100 ,cnt);
-
- Tuple keytuple = new VTuple(1);
- for(int i = 1 ; i < 100 ; i ++) {
- keytuple.put(0, DatumFactory.createInt4(i));
- long offsets = reader.find(keytuple);
- scanner.seek(offsets);
- tuple = scanner.next();
-
- assertTrue("[seek check " + (i) + " ]", ("name_" + i).equals(tuple.get(0).asChars()));
- assertTrue("[seek check " + (i) + " ]" , i == tuple.get(1).asInt4());
- }
-
-
- // The below is for testing RangeRetrieverHandler.
- RowStoreEncoder encoder = RowStoreUtil.createEncoder(keySchema);
- RangeRetrieverHandler handler = new RangeRetrieverHandler(
- new File(new Path(workDir, "output").toUri()), keySchema, comp);
- Map<String,List<String>> kvs = Maps.newHashMap();
- Tuple startTuple = new VTuple(1);
- startTuple.put(0, DatumFactory.createInt4(50));
- kvs.put("start", Lists.newArrayList(
- new String(Base64.encodeBase64(
- encoder.toBytes(startTuple), false))));
- Tuple endTuple = new VTuple(1);
- endTuple.put(0, DatumFactory.createInt4(80));
- kvs.put("end", Lists.newArrayList(
- new String(Base64.encodeBase64(
- encoder.toBytes(endTuple), false))));
- FileChunk chunk = handler.get(kvs);
-
- scanner.seek(chunk.startOffset());
- keytuple = scanner.next();
- assertEquals(50, keytuple.get(1).asInt4());
-
- long endOffset = chunk.startOffset() + chunk.length();
- while((keytuple = scanner.next()) != null && scanner.getNextOffset() <= endOffset) {
- assertTrue(keytuple.get(1).asInt4() <= 80);
- }
-
- scanner.close();
- }
-
- @Test
public final void testSortEnforcer() throws IOException, PlanningException {
FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
new Path(employee.getPath()), Integer.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
deleted file mode 100644
index 200ba31..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.engine.planner.physical.ExternalSortExec;
-import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.engine.planner.physical.ProjectionExec;
-import org.apache.tajo.engine.planner.physical.RangeShuffleFileWriteExec;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.worker.dataserver.retriever.FileChunk;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestRangeRetrieverHandler {
- private TajoTestingCluster util;
- private TajoConf conf;
- private CatalogService catalog;
- private SQLAnalyzer analyzer;
- private LogicalPlanner planner;
- private LogicalOptimizer optimizer;
- private StorageManager sm;
- private Schema schema;
- private static int TEST_TUPLE = 10000;
- private FileSystem fs;
- private Path testDir;
-
- @Before
- public void setUp() throws Exception {
- util = new TajoTestingCluster();
- conf = util.getConfiguration();
- testDir = CommonTestingUtil.getTestDir();
- fs = testDir.getFileSystem(conf);
- util.startCatalogCluster();
- catalog = util.getMiniCatalogCluster().getCatalog();
- catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
- catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
- sm = StorageManager.getFileStorageManager(conf, testDir);
-
- analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer(conf);
-
- schema = new Schema();
- schema.addColumn("empid", Type.INT4);
- schema.addColumn("age", Type.INT4);
- }
-
- @After
- public void tearDown() {
- util.shutdownCatalogCluster();
- }
-
- public String [] SORT_QUERY = {
- "select empId, age from employee order by empId, age",
- "select empId, age from employee order by empId desc, age desc"
- };
-
- @Test
- public void testGet() throws Exception {
- Tuple firstTuple = null;
- Tuple lastTuple;
-
- TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv");
- fs.mkdirs(tableDir.getParent());
- Appender appender = ((FileStorageManager)sm).getAppender(employeeMeta, schema, tableDir);
- appender.init();
-
- Tuple tuple = new VTuple(schema.size());
- for (int i = 0; i < TEST_TUPLE; i++) {
- tuple.put(
- new Datum[] {
- DatumFactory.createInt4(i),
- DatumFactory.createInt4(i + 5)
- });
- appender.addTuple(tuple);
-
- if (firstTuple == null) {
- firstTuple = new VTuple(tuple);
- }
- }
- lastTuple = new VTuple(tuple);
- appender.flush();
- appender.close();
-
- TableDesc employee = new TableDesc(
- CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, employeeMeta,
- tableDir.toUri());
- catalog.createTable(employee);
-
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employeeMeta, tableDir, Integer.MAX_VALUE);
-
- TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
- LocalTajoTestingUtility.newTaskAttemptId(),
- new FileFragment[] {frags[0]}, testDir);
- ctx.setEnforcer(new Enforcer());
- Expr expr = analyzer.parse(SORT_QUERY[0]);
- LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
- LogicalNode rootNode = optimizer.optimize(plan);
-
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
- ExternalSortExec sort = null;
- if (exec instanceof ProjectionExec) {
- ProjectionExec projExec = (ProjectionExec) exec;
- sort = projExec.getChild();
- } else if (exec instanceof ExternalSortExec) {
- sort = (ExternalSortExec) exec;
- } else {
- assertTrue(false);
- }
-
- SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
- RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sort, sort.getSchema(),
- sort.getSchema(), sortSpecs);
-
- exec = idxStoreExec;
- exec.init();
- exec.next();
- exec.close();
-
- Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs);
- BSTIndex bst = new BSTIndex(conf);
- BSTIndex.BSTIndexReader reader = bst.getIndexReader(
- new Path(testDir, "output/index"), keySchema, comp);
- reader.open();
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
- SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema,
- StorageUtil.concatPath(testDir, "output", "output"));
-
- scanner.init();
- int cnt = 0;
- while(scanner.next() != null) {
- cnt++;
- }
- scanner.reset();
-
- assertEquals(TEST_TUPLE ,cnt);
-
- Tuple keytuple = new VTuple(2);
- for(int i = 1 ; i < TEST_TUPLE ; i ++) {
- keytuple.put(0, DatumFactory.createInt4(i));
- keytuple.put(1, DatumFactory.createInt4(i + 5));
- long offsets = reader.find(keytuple);
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("[seek check " + (i) + " ]" , i == tuple.get(0).asInt4());
- //assertTrue("[seek check " + (i) + " ]" , ("name_" + i).equals(tuple.get(1).asChars()));
- }
-
- TupleRange totalRange = new TupleRange(sortSpecs, firstTuple, lastTuple);
- UniformRangePartition partitioner = new UniformRangePartition(totalRange, sortSpecs, true);
- TupleRange [] partitions = partitioner.partition(7);
-
- // The below is for testing RangeRetrieverHandler.
- RangeRetrieverHandler handler = new RangeRetrieverHandler(
- new File((new Path(testDir, "output")).toUri()), keySchema, comp);
-
- List<Long []> offsets = new ArrayList<Long []>();
-
- for (int i = 0; i < partitions.length; i++) {
- FileChunk chunk = getFileChunk(handler, keySchema, partitions[i], i == (partitions.length - 1));
- offsets.add(new Long[] {chunk.startOffset(), chunk.length()});
- }
- scanner.close();
-
- Long[] previous = null;
- for (Long [] offset : offsets) {
- if (offset[0] == 0 && previous == null) {
- previous = offset;
- continue;
- }
- assertTrue(previous[0] + previous[1] == offset[0]);
- previous = offset;
- }
- long fileLength = new File((new Path(testDir, "index").toUri())).length();
- assertTrue(previous[0] + previous[1] == fileLength);
- }
-
- @Test
- public void testGetFromDescendingOrder() throws Exception {
- Tuple firstTuple = null;
- Tuple lastTuple;
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
- Path tablePath = StorageUtil.concatPath(testDir, "testGetFromDescendingOrder", "table.csv");
- fs.mkdirs(tablePath.getParent());
- Appender appender = ((FileStorageManager)sm).getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple = new VTuple(schema.size());
- for (int i = (TEST_TUPLE - 1); i >= 0 ; i--) {
- tuple.put(
- new Datum[] {
- DatumFactory.createInt4(i),
- DatumFactory.createInt4(i + 5)
- });
- appender.addTuple(tuple);
-
- if (firstTuple == null) {
- firstTuple = new VTuple(tuple);
- }
- }
- lastTuple = new VTuple(tuple);
- appender.flush();
- appender.close();
-
- TableDesc employee = new TableDesc(
- CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta, tablePath.toUri());
- catalog.createTable(employee);
-
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
-
- TaskAttemptContext
- ctx = new TaskAttemptContext(new QueryContext(conf),
- LocalTajoTestingUtility.newTaskAttemptId(),
- new FileFragment[] {frags[0]}, testDir);
- ctx.setEnforcer(new Enforcer());
- Expr expr = analyzer.parse(SORT_QUERY[1]);
- LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
- LogicalNode rootNode = optimizer.optimize(plan);
-
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
- ExternalSortExec sort = null;
- if (exec instanceof ProjectionExec) {
- ProjectionExec projExec = (ProjectionExec) exec;
- sort = projExec.getChild();
- } else if (exec instanceof ExternalSortExec) {
- sort = (ExternalSortExec) exec;
- } else {
- assertTrue(false);
- }
-
- SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
- RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sort,
- sort.getSchema(), sort.getSchema(), sortSpecs);
-
- exec = idxStoreExec;
- exec.init();
- exec.next();
- exec.close();
-
- Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
- BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortSpecs);
- BSTIndex bst = new BSTIndex(conf);
- BSTIndex.BSTIndexReader reader = bst.getIndexReader(
- new Path(testDir, "output/index"), keySchema, comp);
- reader.open();
- TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
- SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, outputMeta, schema,
- StorageUtil.concatPath(testDir, "output", "output"));
- scanner.init();
- int cnt = 0;
- while(scanner.next() != null) {
- cnt++;
- }
- scanner.reset();
-
- assertEquals(TEST_TUPLE ,cnt);
-
- Tuple keytuple = new VTuple(2);
- for(int i = (TEST_TUPLE - 1) ; i >= 0; i --) {
- keytuple.put(0, DatumFactory.createInt4(i));
- keytuple.put(1, DatumFactory.createInt4(i + 5));
- long offsets = reader.find(keytuple);
- scanner.seek(offsets);
- tuple = scanner.next();
- assertTrue("[seek check " + (i) + " ]" , i == tuple.get(0).asInt4());
- }
-
- TupleRange totalRange = new TupleRange(sortSpecs, firstTuple, lastTuple);
- UniformRangePartition partitioner = new UniformRangePartition(totalRange, sortSpecs, true);
- TupleRange [] partitions = partitioner.partition(25);
-
- File dataFile = new File((new Path(testDir, "output")).toUri());
-
- // The below is for testing RangeRetrieverHandler.
- RangeRetrieverHandler handler = new RangeRetrieverHandler(
- dataFile, keySchema, comp);
-
- List<Long []> offsets = new ArrayList<Long []>();
-
- for (int i = 0; i < partitions.length; i++) {
- FileChunk chunk = getFileChunk(handler, keySchema, partitions[i], i == 0);
- offsets.add(new Long[] {chunk.startOffset(), chunk.length()});
- }
- scanner.close();
-
- long fileLength = new File(dataFile, "data/data").length();
- Long[] previous = null;
- for (Long [] offset : offsets) {
- if (previous == null) {
- assertTrue(offset[0] + offset[1] == fileLength);
- previous = offset;
- continue;
- }
-
- assertTrue(offset[0] + offset[1] == previous[0]);
- previous = offset;
- }
- }
-
- private FileChunk getFileChunk(RangeRetrieverHandler handler, Schema keySchema,
- TupleRange range, boolean last) throws IOException {
- Map<String,List<String>> kvs = Maps.newHashMap();
- RowStoreEncoder encoder = RowStoreUtil.createEncoder(keySchema);
- kvs.put("start", Lists.newArrayList(
- new String(Base64.encodeBase64(
- encoder.toBytes(range.getStart()),
- false))));
- kvs.put("end", Lists.newArrayList(
- new String(Base64.encodeBase64(
- encoder.toBytes(range.getEnd()), false))));
-
- if (last) {
- kvs.put("final", Lists.newArrayList("true"));
- }
- return handler.get(kvs);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
deleted file mode 100644
index 25a2fbc..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import org.apache.hadoop.net.NetUtils;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.worker.InterDataRetriever;
-import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
-import org.apache.tajo.worker.dataserver.retriever.DirectoryRetriever;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.*;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.Random;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestHttpDataServer {
- private String TEST_DATA = "target/test-data/TestHttpDataServer";
-
- @Before
- public void setUp() throws Exception {
- CommonTestingUtil.getTestDir(TEST_DATA);
- }
-
- @Test
- public final void testHttpDataServer() throws Exception {
- Random rnd = new Random();
- FileWriter writer = new FileWriter(TEST_DATA+"/"+"testHttp");
- String watermark = "test_"+rnd.nextInt();
- writer.write(watermark+"\n");
- writer.flush();
- writer.close();
-
- DataRetriever ret = new DirectoryRetriever(TEST_DATA);
- HttpDataServer server = new HttpDataServer(
- NetUtils.createSocketAddr("127.0.0.1:0"), ret);
- server.start();
-
- InetSocketAddress addr = server.getBindAddress();
- URL url = new URL("http://127.0.0.1:"+addr.getPort()
- + "/testHttp");
- BufferedReader in = new BufferedReader(new InputStreamReader(
- url.openStream()));
- String line;
- boolean found = false;
- while ((line = in.readLine()) != null) {
- if (line.equals(watermark))
- found = true;
- }
- assertTrue(found);
- in.close();
- server.stop();
- }
-
- @Test
- public final void testInterDataRetriver() throws Exception {
- MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
- ExecutionBlockId schid = plan.newExecutionBlockId();
- TaskId qid1 = QueryIdFactory.newTaskId(schid);
- TaskId qid2 = QueryIdFactory.newTaskId(schid);
-
- File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out");
- qid1Dir.mkdirs();
- File qid2Dir = new File(TEST_DATA + "/" + qid2.toString() + "/out");
- qid2Dir.mkdirs();
-
- Random rnd = new Random();
- FileWriter writer = new FileWriter(qid1Dir+"/"+"testHttp");
- String watermark1 = "test_"+rnd.nextInt();
- writer.write(watermark1);
- writer.flush();
- writer.close();
-
- writer = new FileWriter(qid2Dir+"/"+"testHttp");
- String watermark2 = "test_"+rnd.nextInt();
- writer.write(watermark2);
- writer.flush();
- writer.close();
-
- InterDataRetriever ret = new InterDataRetriever();
- HttpDataServer server = new HttpDataServer(
- NetUtils.createSocketAddr("127.0.0.1:0"), ret);
- server.start();
-
- ret.register(qid1, qid1Dir.getPath());
- ret.register(qid2, qid2Dir.getPath());
-
- InetSocketAddress addr = server.getBindAddress();
-
- assertDataRetrival(qid1, addr.getPort(), watermark1);
- assertDataRetrival(qid2, addr.getPort(), watermark2);
-
- server.stop();
- }
-
- @Test(expected = FileNotFoundException.class)
- public final void testNoSuchFile() throws Exception {
- MasterPlan plan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
- ExecutionBlockId schid = plan.newExecutionBlockId();
- TaskId qid1 = QueryIdFactory.newTaskId(schid);
- TaskId qid2 = QueryIdFactory.newTaskId(schid);
-
- File qid1Dir = new File(TEST_DATA + "/" + qid1.toString() + "/out");
- qid1Dir.mkdirs();
- File qid2Dir = new File(TEST_DATA + "/" + qid2.toString() + "/out");
- qid2Dir.mkdirs();
-
- Random rnd = new Random();
- FileWriter writer = new FileWriter(qid1Dir+"/"+"testHttp");
- String watermark1 = "test_"+rnd.nextInt();
- writer.write(watermark1);
- writer.flush();
- writer.close();
-
- writer = new FileWriter(qid2Dir+"/"+"testHttp");
- String watermark2 = "test_"+rnd.nextInt();
- writer.write(watermark2);
- writer.flush();
- writer.close();
-
- InterDataRetriever ret = new InterDataRetriever();
- HttpDataServer server = new HttpDataServer(
- NetUtils.createSocketAddr("127.0.0.1:0"), ret);
- server.start();
-
- ret.register(qid1, qid1Dir.getPath());
- InetSocketAddress addr = server.getBindAddress();
- assertDataRetrival(qid1, addr.getPort(), watermark1);
- ret.unregister(qid1);
- assertDataRetrival(qid1, addr.getPort(), watermark1);
- }
-
- private static void assertDataRetrival(TaskId id, int port,
- String watermark) throws IOException {
- URL url = new URL("http://127.0.0.1:"+port
- + "/?qid=" + id.toString() + "&fn=testHttp");
- BufferedReader in = new BufferedReader(new InputStreamReader(
- url.openStream()));
- String line;
- boolean found = false;
- while ((line = in.readLine()) != null) {
- if (line.equals(watermark))
- found = true;
- }
- assertTrue(found);
- in.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bc478ba8/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java b/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java
deleted file mode 100644
index bb2eb82..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/worker/dataserver/TestHttpUtil.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.dataserver;
-
-import com.google.common.collect.Maps;
-import org.junit.Test;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestHttpUtil {
- private URI uri = URI.create("http://127.0.0.1:80/?key1=val1&key2=val2");
-
- @Test
- public void testGetParams() throws UnsupportedEncodingException {
- Map<String,String> params = HttpUtil.getParamsFromQuery(uri.getQuery());
- assertEquals(2, params.size());
- assertEquals("val1", params.get("key1"));
- assertEquals("val2", params.get("key2"));
- }
-
- @Test
- public void testBuildQuery() throws UnsupportedEncodingException {
- Map<String,String> params = Maps.newTreeMap();
- params.put("key1", "val1");
- params.put("key2", "val2");
- String query = HttpUtil.buildQuery(params);
- assertEquals(uri.getQuery(), query);
- }
-}