You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/02 16:16:00 UTC
[06/51] [partial] TAJO-22: The package prefix should be
org.apache.tajo. (DaeMyung Kang via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
deleted file mode 100644
index 0fd534e..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import tajo.QueryConf;
-import tajo.QueryUnitAttemptId;
-import tajo.SubQueryId;
-import tajo.TajoProtos.TaskAttemptState;
-import tajo.conf.TajoConf.ConfVars;
-import tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
-import tajo.engine.query.QueryUnitRequestImpl;
-import tajo.ipc.MasterWorkerProtocol;
-import tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
-import tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface;
-import tajo.rpc.CallFuture2;
-import tajo.rpc.NullCallback;
-import tajo.rpc.ProtoAsyncRpcClient;
-import tajo.util.TajoIdUtils;
-
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.Map;
-import java.util.concurrent.*;
-
-import static tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
-
-/**
- * The driver class for Tajo QueryUnit processing.
- */
-public class TaskRunner extends AbstractService {
- /** class logger */
- private static final Log LOG = LogFactory.getLog(TaskRunner.class);
-
- private QueryConf conf;
-
- private volatile boolean stopped = false;
-
- private final SubQueryId subQueryId;
- private ApplicationId appId;
- private final NodeId nodeId;
- private final ContainerId containerId;
-
- // Cluster Management
- private MasterWorkerProtocolService.Interface master;
-
- // for temporal or intermediate files
- private FileSystem localFS;
- // for input files
- private FileSystem defaultFS;
-
- private TajoQueryEngine queryEngine;
-
- // TODO - this should be configurable
- private final int coreNum = 4;
-
- // for Fetcher
- private final ExecutorService fetchLauncher =
- Executors.newFixedThreadPool(coreNum * 4);
- // It keeps all of the query unit attempts while a TaskRunner is running.
- private final Map<QueryUnitAttemptId, Task> tasks =
- new ConcurrentHashMap<QueryUnitAttemptId, Task>();
- private LocalDirAllocator lDirAllocator;
-
- // A thread to receive each assigned query unit and execute the query unit
- private Thread taskLauncher;
-
- // Contains the object references related for TaskRunner
- private WorkerContext workerContext;
- // for the doAs block
- private UserGroupInformation taskOwner;
-
- // for the local temporal dir
- private String baseDir;
- private Path baseDirPath;
-
- public TaskRunner(
- final SubQueryId subQueryId,
- final NodeId nodeId,
- UserGroupInformation taskOwner,
- Interface master, ContainerId containerId) {
- super(TaskRunner.class.getName());
- this.subQueryId = subQueryId;
- this.appId = subQueryId.getQueryId().getApplicationId();
- this.nodeId = nodeId;
- this.taskOwner = taskOwner;
- this.master = master;
- this.containerId = containerId;
- }
-
- @Override
- public void init(Configuration _conf) {
- this.conf = (QueryConf) _conf;
-
- try {
- this.workerContext = new WorkerContext();
-
- // initialize DFS and LocalFileSystems
- defaultFS = FileSystem.get(URI.create(conf.get("tajo.rootdir")),conf);
- localFS = FileSystem.getLocal(conf);
-
- // the base dir for an output dir
- baseDir = ConverterUtils.toString(appId)
- + "/output" + "/" + subQueryId.getId();
-
- // initialize LocalDirAllocator
- lDirAllocator = new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
-
- baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf));
- LOG.info("TaskRunner basedir is created (" + baseDir +")");
-
- // Setup QueryEngine according to the query plan
- // Here, we can setup row-based query engine or columnar query engine.
- this.queryEngine = new TajoQueryEngine(conf);
-
- Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
- } catch (Throwable t) {
- LOG.error(t);
- }
-
- super.init(conf);
- }
-
- @Override
- public void start() {
- run();
- }
-
- @Override
- public void stop() {
- if (!isStopped()) {
- // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
- for (Task task : tasks.values()) {
- if (task.getStatus() == TaskAttemptState.TA_PENDING ||
- task.getStatus() == TaskAttemptState.TA_RUNNING) {
- task.setState(TaskAttemptState.TA_FAILED);
- }
- }
-
- // If this flag become true, taskLauncher will be terminated.
- this.stopped = true;
-
- LOG.info("STOPPED: " + nodeId);
- synchronized (this) {
- notifyAll();
- }
- }
- }
-
- class WorkerContext {
- public QueryConf getConf() {
- return conf;
- }
-
- public String getNodeId() {
- return nodeId.toString();
- }
-
- public MasterWorkerProtocolService.Interface getMaster() {
- return master;
- }
-
- public FileSystem getLocalFS() {
- return localFS;
- }
-
- public FileSystem getDefaultFS() {
- return defaultFS;
- }
-
- public LocalDirAllocator getLocalDirAllocator() {
- return lDirAllocator;
- }
-
- public TajoQueryEngine getTQueryEngine() {
- return queryEngine;
- }
-
- public Map<QueryUnitAttemptId, Task> getTasks() {
- return tasks;
- }
-
- public Task getTask(QueryUnitAttemptId taskId) {
- return tasks.get(taskId);
- }
-
- public ExecutorService getFetchLauncher() {
- return fetchLauncher;
- }
-
- public Path getBaseDir() {
- return baseDirPath;
- }
- }
-
- static void fatalError(MasterWorkerProtocolService.Interface proxy,
- QueryUnitAttemptId taskAttemptId, String message) {
- TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
- .setId(taskAttemptId.getProto())
- .setErrorMessage(message);
- proxy.fatalError(null, builder.build(), NullCallback.get());
- }
-
- public void run() {
- LOG.info("TaskRunner startup");
-
- try {
-
- taskLauncher = new Thread(new Runnable() {
- @Override
- public void run() {
- int receivedNum = 0;
- CallFuture2<QueryUnitRequestProto> callFuture = null;
- QueryUnitRequestProto taskRequest = null;
-
- while(!stopped) {
- try {
- if (callFuture == null) {
- callFuture = new CallFuture2<QueryUnitRequestProto>();
- master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(),
- callFuture);
- }
- try {
- // wait for an assigning task for 3 seconds
- taskRequest = callFuture.get(3, TimeUnit.SECONDS);
- } catch (TimeoutException te) {
- // if there has been no assigning task for a given period,
- // TaskRunner will retry to request an assigning task.
- LOG.error(te);
-
- continue;
- }
-
- if (taskRequest != null) {
- // QueryMaster can send the terminal signal to TaskRunner.
- // If TaskRunner receives the terminal signal, TaskRunner will be terminated
- // immediately.
- if (taskRequest.getShouldDie()) {
- LOG.info("received ShouldDie flag");
- stop();
-
- } else {
-
- LOG.info("Accumulated Received Task: " + (++receivedNum));
-
- QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
- if (tasks.containsKey(taskAttemptId)) {
- fatalError(master, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
- continue;
- }
-
- LOG.info("Initializing: " + taskAttemptId);
- Task task = new Task(taskAttemptId, workerContext, master,
- new QueryUnitRequestImpl(taskRequest));
- tasks.put(taskAttemptId, task);
-
- task.init();
- if (task.hasFetchPhase()) {
- task.fetch(); // The fetch is performed in an asynchronous way.
- }
- // task.run() is a blocking call.
- task.run();
-
- callFuture = null;
- taskRequest = null;
- }
- }
- } catch (Throwable t) {
- LOG.error(t);
- }
- }
- }
- });
- taskLauncher.start();
- taskLauncher.join();
-
- } catch (Throwable t) {
- LOG.fatal("Unhandled exception. Starting shutdown.", t);
- } finally {
- for (Task t : tasks.values()) {
- if (t.getStatus() != TaskAttemptState.TA_SUCCEEDED) {
- t.abort();
- }
- }
- }
- }
-
- private class ShutdownHook implements Runnable {
- @Override
- public void run() {
- LOG.info("received SIGINT Signal");
- stop();
- }
- }
-
- /**
- * @return true if a stop has been requested.
- */
- public boolean isStopped() {
- return this.stopped;
- }
-
- /**
- * TaskRunner takes 5 arguments as follows:
- * <ol>
- * <li>1st: TaskRunnerListener hostname</li>
- * <li>2nd: TaskRunnerListener port</li>
- * <li>3nd: SubQueryId</li>
- * <li>4th: NodeId</li>
- * <li>5th: ContainerId</li>
- * </ol>
- */
- public static void main(String[] args) throws Exception {
- // Restore QueryConf
- final QueryConf conf = new QueryConf();
- conf.addResource(new Path(QueryConf.FILENAME));
-
- LOG.info("MiniTajoYarn NM Local Dir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname));
- LOG.info("OUTPUT DIR: " + conf.getOutputPath());
- LOG.info("Tajo Root Dir: " + conf.get("tajo.rootdir"));
-
- UserGroupInformation.setConfiguration(conf);
-
- // TaskRunnerListener's address
- String host = args[0];
- int port = Integer.parseInt(args[1]);
- final InetSocketAddress masterAddr =
- NetUtils.createSocketAddrForHost(host, port);
-
- // SubQueryId from String
- final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[2]);
- // NodeId has a form of hostname:port.
- NodeId nodeId = ConverterUtils.toNodeId(args[3]);
- ContainerId containerId = ConverterUtils.toContainerId(args[4]);
-
- // TODO - 'load credential' should be implemented
- // Getting taskOwner
- UserGroupInformation taskOwner =
- UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
- //taskOwner.addToken(token);
-
- // TaskRunnerListener RPC
- ProtoAsyncRpcClient client;
- MasterWorkerProtocolService.Interface master;
-
- // initialize MasterWorkerProtocol as an actual task owner.
- client =
- taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
- @Override
- public ProtoAsyncRpcClient run() throws Exception {
- return new ProtoAsyncRpcClient(MasterWorkerProtocol.class, masterAddr);
- }
- });
- master = client.getStub();
-
-
- TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, master, containerId);
- taskRunner.init(conf);
- taskRunner.start();
- client.close();
- LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/FileAccessForbiddenException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/FileAccessForbiddenException.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/FileAccessForbiddenException.java
deleted file mode 100644
index 5f0c772..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/FileAccessForbiddenException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver;
-
-import java.io.IOException;
-
-public class FileAccessForbiddenException extends IOException {
- private static final long serialVersionUID = -3383272565826389213L;
-
- public FileAccessForbiddenException() {
- }
-
- public FileAccessForbiddenException(String message) {
- super(message);
- }
-
- public FileAccessForbiddenException(Throwable cause) {
- super(cause);
- }
-
- public FileAccessForbiddenException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServer.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServer.java
deleted file mode 100644
index a35922e..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import tajo.worker.dataserver.retriever.DataRetriever;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-
-public class HttpDataServer {
- private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
-
- private final InetSocketAddress addr;
- private InetSocketAddress bindAddr;
- private ServerBootstrap bootstrap = null;
- private ChannelFactory factory = null;
- private ChannelGroup channelGroup = null;
-
- public HttpDataServer(final InetSocketAddress addr,
- final DataRetriever retriever) {
- this.addr = addr;
- this.factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
- Runtime.getRuntime().availableProcessors() * 2);
-
- // Configure the server.
- this.bootstrap = new ServerBootstrap(factory);
- // Set up the event pipeline factory.
- this.bootstrap.setPipelineFactory(
- new HttpDataServerPipelineFactory(retriever));
- this.channelGroup = new DefaultChannelGroup();
- }
-
- public HttpDataServer(String bindaddr, DataRetriever retriever) {
- this(NetUtils.createSocketAddr(bindaddr), retriever);
- }
-
- public void start() {
- // Bind and start to accept incoming connections.
- Channel channel = bootstrap.bind(addr);
- channelGroup.add(channel);
- this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
- LOG.info("HttpDataServer starts up ("
- + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
- + ")");
- }
-
- public InetSocketAddress getBindAddress() {
- return this.bindAddr;
- }
-
- public void stop() {
- ChannelGroupFuture future = channelGroup.close();
- future.awaitUninterruptibly();
- factory.releaseExternalResources();
-
- LOG.info("HttpDataServer shutdown ("
- + this.bindAddr.getAddress().getHostAddress() + ":"
- + this.bindAddr.getPort() + ")");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerHandler.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerHandler.java
deleted file mode 100644
index 31ab6e1..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerHandler.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
-import tajo.worker.dataserver.retriever.DataRetriever;
-import tajo.worker.dataserver.retriever.FileChunk;
-
-import java.io.*;
-import java.net.URLDecoder;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
- private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
- private final DataRetriever retriever;
-
- public HttpDataServerHandler(DataRetriever retriever) {
- this.retriever = retriever;
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
- }
-
- FileChunk [] file;
- try {
- file = retriever.handle(ctx, request);
- } catch (FileNotFoundException fnf) {
- LOG.error(fnf);
- sendError(ctx, NOT_FOUND);
- return;
- } catch (IllegalArgumentException iae) {
- LOG.error(iae);
- sendError(ctx, BAD_REQUEST);
- return;
- } catch (FileAccessForbiddenException fafe) {
- LOG.error(fafe);
- sendError(ctx, FORBIDDEN);
- return;
- } catch (IOException ioe) {
- LOG.error(ioe);
- sendError(ctx, INTERNAL_SERVER_ERROR);
- return;
- }
-
- // Write the content.
- Channel ch = e.getChannel();
- if (file == null) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
- ch.write(response);
- if (!isKeepAlive(request)) {
- ch.close();
- }
- } else {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- long totalSize = 0;
- for (FileChunk chunk : file) {
- totalSize += chunk.length();
- }
- setContentLength(response, totalSize);
-
- // Write the initial line and the header.
- ch.write(response);
-
- ChannelFuture writeFuture = null;
-
- for (FileChunk chunk : file) {
- writeFuture = sendFile(ctx, ch, chunk);
- if (writeFuture == null) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- }
-
- // Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
- // Close the connection when the whole content is written out.
- writeFuture.addListener(ChannelFutureListener.CLOSE);
- }
- }
- }
-
- private ChannelFuture sendFile(ChannelHandlerContext ctx, Channel ch, FileChunk file) throws IOException {
- RandomAccessFile raf;
- try {
- raf = new RandomAccessFile(file.getFile(), "r");
- } catch (FileNotFoundException fnfe) {
- return null;
- }
-
- ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) != null) {
- // Cannot use zero-copy with HTTPS.
- writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), file.length(), 8192));
- } else {
- // No encryption - use zero-copy.
- final FileRegion region = new DefaultFileRegion(raf.getChannel(), file.startOffset(), file.length());
- writeFuture = ch.write(region);
- writeFuture.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) {
- region.releaseExternalResources();
- }
- });
- }
-
- return writeFuture;
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
- if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
- return;
- }
-
- cause.printStackTrace();
- if (ch.isConnected()) {
- sendError(ctx, INTERNAL_SERVER_ERROR);
- }
- }
-
- public static String sanitizeUri(String uri) {
- // Decode the path.
- try {
- uri = URLDecoder.decode(uri, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- try {
- uri = URLDecoder.decode(uri, "ISO-8859-1");
- } catch (UnsupportedEncodingException e1) {
- throw new Error();
- }
- }
-
- // Convert file separators.
- uri = uri.replace('/', File.separatorChar);
-
- // Simplistic dumb security check.
- // You will have to do something serious in the production environment.
- if (uri.contains(File.separator + ".")
- || uri.contains("." + File.separator) || uri.startsWith(".")
- || uri.endsWith(".")) {
- return null;
- }
-
- // Convert to absolute path.
- return uri;
- }
-
- private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(ChannelBuffers.copiedBuffer(
- "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
-
- // Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerPipelineFactory.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
deleted file mode 100644
index e4fd554..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import tajo.worker.dataserver.retriever.DataRetriever;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
- private final DataRetriever ret;
-
- public HttpDataServerPipelineFactory(DataRetriever ret) {
- this.ret = ret;
- }
-
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = pipeline();
-
- // Uncomment the following line if you want HTTPS
- // SSLEngine engine =
- // SecureChatSslContextFactory.getServerContext().createSSLEngine();
- // engine.setUseClientMode(false);
- // pipeline.addLast("ssl", new SslHandler(engine));
-
- pipeline.addLast("decoder", new HttpRequestDecoder());
- //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
- //pipeline.addLast("deflater", new HttpContentCompressor());
- pipeline.addLast("handler", new HttpDataServerHandler(ret));
- return pipeline;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpUtil.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpUtil.java
deleted file mode 100644
index 4c18a0f..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpUtil.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver;
-
-import com.google.common.collect.Maps;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.util.Map;
-
-public class HttpUtil {
- public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException {
- return getParamsFromQuery(uri.getQuery());
- }
-
- /**
- * It parses a query string into key/value pairs
- *
- * @param queryString decoded query string
- * @return key/value pairs parsed from a given query string
- * @throws UnsupportedEncodingException
- */
- public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException {
- String [] queries = queryString.split("&");
-
- Map<String,String> params = Maps.newHashMap();
- String [] param;
- for (String q : queries) {
- param = q.split("=");
- params.put(param[0], param[1]);
- }
-
- return params;
- }
-
- public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException {
- StringBuilder sb = new StringBuilder();
-
- boolean first = true;
- for (Map.Entry<String,String> param : params.entrySet()) {
- if (!first) {
- sb.append("&");
- }
- sb.append(URLEncoder.encode(param.getKey(), "UTF-8")).
- append("=").
- append(URLEncoder.encode(param.getValue(), "UTF-8"));
- first = false;
- }
-
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
deleted file mode 100644
index fef717b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver.retriever;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import tajo.QueryUnitAttemptId;
-import tajo.QueryUnitId;
-import tajo.SubQueryId;
-import tajo.util.TajoIdUtils;
-import tajo.worker.dataserver.FileAccessForbiddenException;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class AdvancedDataRetriever implements DataRetriever {
- private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
- private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
-
- public AdvancedDataRetriever() {
- }
-
- public void register(QueryUnitAttemptId id, RetrieverHandler handler) {
- synchronized (handlerMap) {
- if (!handlerMap.containsKey(id.toString())) {
- handlerMap.put(id.toString(), handler);
- }
- }
- }
-
- public void unregister(QueryUnitAttemptId id) {
- synchronized (handlerMap) {
- if (handlerMap.containsKey(id.toString())) {
- handlerMap.remove(id.toString());
- }
- }
- }
-
- /* (non-Javadoc)
- * @see tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
- */
- @Override
- public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
- throws IOException {
-
- final Map<String, List<String>> params =
- new QueryStringDecoder(request.getUri()).getParameters();
-
- if (!params.containsKey("qid")) {
- throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
- }
-
- if (params.containsKey("sid")) {
- List<FileChunk> chunks = Lists.newArrayList();
- List<String> qids = splitMaps(params.get("qid"));
- for (String qid : qids) {
- String[] ids = qid.split("_");
- SubQueryId suid = TajoIdUtils.newSubQueryId(params.get("sid").get(0));
- QueryUnitId quid = new QueryUnitId(suid, Integer.parseInt(ids[0]));
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid,
- Integer.parseInt(ids[1]));
- RetrieverHandler handler = handlerMap.get(attemptId.toString());
- FileChunk chunk = handler.get(params);
- chunks.add(chunk);
- }
- return chunks.toArray(new FileChunk[chunks.size()]);
- } else {
- RetrieverHandler handler = handlerMap.get(params.get("qid").get(0));
- FileChunk chunk = handler.get(params);
- if (chunk == null) {
- if (params.containsKey("qid")) { // if there is no content corresponding to the query
- return null;
- } else { // if there is no
- throw new FileNotFoundException("No such a file corresponding to " + params.get("qid"));
- }
- }
-
- File file = chunk.getFile();
- if (file.isHidden() || !file.exists()) {
- throw new FileNotFoundException("No such file: " + file.getAbsolutePath());
- }
- if (!file.isFile()) {
- throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file");
- }
-
- return new FileChunk[] {chunk};
- }
- }
-
- private List<String> splitMaps(List<String> qids) {
- if (null == qids) {
- LOG.error("QueryUnitId is EMPTY");
- return null;
- }
-
- final List<String> ret = new ArrayList<String>();
- for (String qid : qids) {
- Collections.addAll(ret, qid.split(","));
- }
- return ret;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DataRetriever.java
deleted file mode 100644
index 83d0540..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DataRetriever.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-
-import java.io.IOException;
-
-public interface DataRetriever {
- FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
- throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DirectoryRetriever.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DirectoryRetriever.java
deleted file mode 100644
index bd44878..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DirectoryRetriever.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import tajo.worker.dataserver.FileAccessForbiddenException;
-import tajo.worker.dataserver.HttpDataServerHandler;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-public class DirectoryRetriever implements DataRetriever {
- public String baseDir;
-
- public DirectoryRetriever(String baseDir) {
- this.baseDir = baseDir;
- }
-
- @Override
- public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
- throws IOException {
- final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
- if (path == null) {
- throw new IllegalArgumentException("Wrong path: " +path);
- }
-
- File file = new File(baseDir, path);
- if (file.isHidden() || !file.exists()) {
- throw new FileNotFoundException("No such file: " + baseDir + "/" + path);
- }
- if (!file.isFile()) {
- throw new FileAccessForbiddenException("No such file: "
- + baseDir + "/" + path);
- }
-
- return new FileChunk[] {new FileChunk(file, 0, file.length())};
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/FileChunk.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/FileChunk.java
deleted file mode 100644
index 9af274b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/FileChunk.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver.retriever;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-
-public class FileChunk {
- private final File file;
- private final long startOffset;
- private final long length;
-
- public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
- this.file = file;
- this.startOffset = startOffset;
- this.length = length;
- }
-
- public File getFile() {
- return this.file;
- }
-
- public long startOffset() {
- return this.startOffset;
- }
-
- public long length() {
- return this.length;
- }
-
- public String toString() {
- return " (start=" + startOffset() + ", length=" + length + ") "
- + file.getAbsolutePath();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/RetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/RetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/RetrieverHandler.java
deleted file mode 100644
index ae2d5c4..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/RetrieverHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver.retriever;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public interface RetrieverHandler {
- /**
- *
- * @param kvs url-decoded key/value pairs
- * @return a desired part of a file
- * @throws IOException
- */
- public FileChunk get(Map<String, List<String>> kvs) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/CatalogProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtocol.proto
index d273059..a7866ac 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtocol.proto
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-option java_package = "tajo.catalog";
+option java_package = "org.apache.tajo.catalog";
option java_outer_classname = "CatalogProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
index 36b33c1..ee7e277 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-option java_package = "tajo.catalog.proto";
+option java_package = "org.apache.tajo.catalog.proto";
option java_outer_classname = "CatalogProtos";
option optimize_for = SPEED;
option java_generic_services = false;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
index e78c87f..cbcccd3 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-option java_package = "tajo.client";
+option java_package = "org.apache.tajo.client";
option java_outer_classname = "ClientProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/DataTypes.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/DataTypes.proto b/tajo-core/tajo-core-backend/src/main/proto/DataTypes.proto
index 262ecac..3dbf6fa 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/DataTypes.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/DataTypes.proto
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-option java_package = "tajo.common";
+option java_package = "org.apache.tajo.common";
option java_outer_classname = "TajoDataTypes";
option optimize_for = SPEED;
option java_generic_services = false;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto
index c2f2aab..1ddf24a 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-option java_package = "tajo.ipc";
+option java_package = "org.apache.tajo.ipc";
option java_outer_classname = "MasterWorkerProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto
index d8863e4..9fea5be 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-option java_package = "tajo.engine";
+option java_package = "org.apache.tajo.engine";
option java_outer_classname = "MasterWorkerProtos";
option java_generic_services = false;
option java_generate_equals_and_hash = true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/PrimitiveProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/PrimitiveProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/PrimitiveProtos.proto
index 5fa6903..e722190 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/PrimitiveProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/PrimitiveProtos.proto
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-option java_package = "tajo.rpc.protocolrecords";
+option java_package = "org.apache.tajo.rpc.protocolrecords";
option java_outer_classname = "PrimitiveProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
index abd47c0..04c67f2 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-option java_package = "tajo";
+option java_package = "org.apache.tajo";
option java_outer_classname = "TajoIdProtos";
option java_generic_services = false;
option java_generate_equals_and_hash = true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto b/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
index d4d2fee..3d22ff5 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-option java_package = "tajo";
+option java_package = "org.apache.tajo";
option java_outer_classname = "TajoProtos";
option java_generic_services = false;
option java_generate_equals_and_hash = true;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
new file mode 100644
index 0000000..ea711b8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.QueryAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.ResultSetImpl;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.UUID;
+
+public class BackendTestingUtil {
+ public final static Schema mockupSchema;
+ public final static TableMeta mockupMeta;
+
+ static {
+ mockupSchema = new Schema();
+ mockupSchema.addColumn("deptname", Type.TEXT);
+ mockupSchema.addColumn("score", Type.INT4);
+ mockupMeta = CatalogUtil.newTableMeta(mockupSchema, StoreType.CSV);
+ }
+
+ public static void writeTmpTable(TajoConf conf, Path path,
+ String tableName, boolean writeMeta)
+ throws IOException {
+ StorageManager sm = StorageManager.get(conf, path);
+ FileSystem fs = sm.getFileSystem();
+ Appender appender;
+
+ Path tablePath = StorageUtil.concatPath(path, tableName, "table.csv");
+ if (fs.exists(tablePath.getParent())) {
+ fs.delete(tablePath.getParent(), true);
+ }
+ fs.mkdirs(tablePath.getParent());
+
+ if (writeMeta) {
+ FileUtil.writeProto(fs, new Path(tablePath.getParent(), ".meta"), mockupMeta.getProto());
+ }
+ appender = StorageManager.getAppender(conf, mockupMeta, tablePath);
+ appender.init();
+
+ int deptSize = 10000;
+ int tupleNum = 100;
+ Tuple tuple;
+ for (int i = 0; i < tupleNum; i++) {
+ tuple = new VTuple(2);
+ String key = "test" + (i % deptSize);
+ tuple.put(0, DatumFactory.createText(key));
+ tuple.put(1, DatumFactory.createInt4(i + 1));
+ appender.addTuple(tuple);
+ }
+ appender.close();
+ }
+
+ public static void writeTmpTable(TajoConf conf, String parent,
+ String tableName, boolean writeMeta) throws IOException {
+ writeTmpTable(conf, new Path(parent), tableName, writeMeta);
+ }
+
+ private TajoConf conf;
+ private CatalogService catalog;
+ private QueryAnalyzer analyzer;
+ private LogicalPlanner planner;
+ public BackendTestingUtil(TajoConf conf) throws IOException {
+ this.conf = conf;
+ this.catalog = new LocalCatalog(conf);
+ analyzer = new QueryAnalyzer(catalog);
+ planner = new LogicalPlanner(catalog);
+ }
+
+ public ResultSet run(String [] tableNames, File [] tables, Schema [] schemas, String query)
+ throws IOException {
+ Path workDir = createTmpTestDir();
+ StorageManager sm = StorageManager.get(new TajoConf(), workDir);
+ List<Fragment> frags = Lists.newArrayList();
+ for (int i = 0; i < tableNames.length; i++) {
+ Fragment [] splits = sm.split(tableNames[i], new Path(tables[i].getAbsolutePath()));
+ for (Fragment f : splits) {
+ frags.add(f);
+ }
+ }
+
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ TUtil.newQueryUnitAttemptId(),
+ frags.toArray(new Fragment[frags.size()]), workDir);
+ PlanningContext context = analyzer.parse(query);
+ LogicalNode plan = planner.createPlan(context);
+ plan = LogicalOptimizer.optimize(context, plan);
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ return new ResultSetImpl(conf, new Path(workDir, "out"));
+ }
+
+ public static Path createTmpTestDir() throws IOException {
+ String randomStr = UUID.randomUUID().toString();
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path dir = new Path("target/test-data", randomStr);
+ // Have it cleaned up on exit
+ if (fs.exists(dir)) {
+ fs.delete(dir, true);
+ }
+ return dir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/IntegrationTest.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/IntegrationTest.java
new file mode 100644
index 0000000..755b90a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/IntegrationTest.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+public interface IntegrationTest {
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
new file mode 100644
index 0000000..6425192
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+
+public class LocalTajoTestingUtility {
+ private TajoTestingCluster util;
+ private TajoConf conf;
+ private TajoClient client;
+
+ public void setup(String[] names,
+ String[] tablepaths,
+ Schema[] schemas,
+ Options option) throws Exception {
+
+ util = new TajoTestingCluster();
+ util.startMiniCluster(1);
+ conf = util.getConfiguration();
+ client = new TajoClient(conf);
+
+ FileSystem fs = util.getDefaultFileSystem();
+ Path rootDir = util.getMaster().
+ getStorageManager().getBaseDir();
+ fs.mkdirs(rootDir);
+ for (int i = 0; i < tablepaths.length; i++) {
+ Path localPath = new Path(tablepaths[i]);
+ Path tablePath = new Path(rootDir, names[i]);
+ fs.mkdirs(tablePath);
+ Path dataPath = new Path(tablePath, "data");
+ fs.mkdirs(dataPath);
+ Path dfsPath = new Path(dataPath, localPath.getName());
+ fs.copyFromLocalFile(localPath, dfsPath);
+ TableMeta meta = CatalogUtil.newTableMeta(schemas[i],
+ CatalogProtos.StoreType.CSV, option);
+ client.createTable(names[i], tablePath, meta);
+ }
+ }
+
+ public TajoTestingCluster getTestingCluster() {
+ return util;
+ }
+
+ public ResultSet execute(String query) throws IOException, ServiceException {
+ return client.executeQueryAndGetResult(query);
+ }
+
+ public void shutdown() throws IOException {
+ client.close();
+ util.shutdownMiniCluster();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
new file mode 100644
index 0000000..8de0f0b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.LocalContainerLauncher;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.PullServerAuxService;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Configures and starts the Tajo-specific components in the YARN cluster.
+ *
+ */
+public class MiniTajoYarnCluster extends MiniYARNCluster {
+
+ public static final String APPJAR = JarFinder
+ .getJar(LocalContainerLauncher.class);
+
+ private static final Log LOG = LogFactory.getLog(MiniTajoYarnCluster.class);
+
+ public MiniTajoYarnCluster(String testName) {
+ this(testName, 1);
+ }
+
+ public MiniTajoYarnCluster(String testName, int noOfNMs) {
+ super(testName, noOfNMs, 1, 1);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+ "apps_staging_dir/").getAbsolutePath());
+ }
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
+
+ try {
+ Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+ new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+ FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+ if (fc.util().exists(stagingPath)) {
+ LOG.info(stagingPath + " exists! deleting...");
+ fc.delete(stagingPath, true);
+ }
+ LOG.info("mkdir: " + stagingPath);
+ //mkdir the staging directory so that right permissions are set while running as proxy user
+ fc.mkdir(stagingPath, null, true);
+ //mkdir done directory as well
+ String doneDir = JobHistoryUtils
+ .getConfiguredHistoryServerDoneDirPrefix(conf);
+ Path doneDirPath = fc.makeQualified(new Path(doneDir));
+ fc.mkdir(doneDirPath, null, true);
+ } catch (IOException e) {
+ throw new YarnException("Could not create staging directory. ", e);
+ }
+ conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
+ // which shuffle doesn't happen
+ //configure the shuffle service in NM
+ conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, PullServerAuxService.PULLSERVER_SERVICEID);
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+ PullServerAuxService.PULLSERVER_SERVICEID), PullServerAuxService.class,
+ Service.class);
+
+ // Non-standard shuffle port
+ conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.name(), 0);
+
+ conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+ DefaultContainerExecutor.class, ContainerExecutor.class);
+
+ // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+ // for corresponding uberized tests.
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+
+ LOG.info("MiniTajoYarn NM Local Dir: " + getConfig().get(YarnConfiguration.NM_LOCAL_DIRS));
+ }
+
+ private class JobHistoryServerWrapper extends AbstractService {
+ public JobHistoryServerWrapper() {
+ super(JobHistoryServerWrapper.class.getName());
+ }
+
+ @Override
+ public synchronized void start() {
+ try {
+ if (!getConfig().getBoolean(
+ JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS,
+ JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) {
+ // pick free random ports.
+ getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
+ MiniYARNCluster.getHostname() + ":0");
+ getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
+ MiniYARNCluster.getHostname() + ":0");
+ }
+ super.start();
+ } catch (Throwable t) {
+ throw new YarnException(t);
+ }
+
+ LOG.info("MiniMRYARN ResourceManager address: " +
+ getConfig().get(YarnConfiguration.RM_ADDRESS));
+ LOG.info("MiniMRYARN ResourceManager web address: " +
+ getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
+ LOG.info("MiniMRYARN HistoryServer address: " +
+ getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
+ LOG.info("MiniMRYARN HistoryServer web address: " +
+ getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
+ }
+
+ @Override
+ public synchronized void stop() {
+ super.stop();
+ }
+ }
+
+ public static void main(String [] args) {
+ MiniTajoYarnCluster cluster = new MiniTajoYarnCluster(MiniTajoYarnCluster.class.getName());
+ cluster.init(new TajoConf());
+ cluster.start();
+ }
+}
+