You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/08 09:29:13 UTC
[1/3] tajo git commit: TAJO-1229: rename tajo-yarn-pullserver to
tajo-pullserver.
Repository: tajo
Updated Branches:
refs/heads/master facd1ddcc -> b5aa78046
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
deleted file mode 100644
index 1c63c8a..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ /dev/null
@@ -1,654 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.util.TajoIdUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class PullServerAuxService extends AuxiliaryService {
-
- private static final Log LOG = LogFactory.getLog(PullServerAuxService.class);
-
- public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
- public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
- public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
- public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
- private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
- private HttpPipelineFactory pipelineFact;
- private int sslFileBufferSize;
-
- private ApplicationId appId;
- private QueryId queryId;
- private FileSystem localFS;
-
- /**
- * Should the shuffle use posix_fadvise calls to manage the OS cache during
- * sendfile
- */
- private boolean manageOsCache;
- private int readaheadLength;
- private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
-
-
- public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
-
- private static final Map<String,String> userRsrc =
- new ConcurrentHashMap<String,String>();
- private static String userName;
-
- public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
- "tajo.pullserver.ssl.file.buffer.size";
-
- public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
-
- @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
- static class ShuffleMetrics implements ChannelFutureListener {
- @Metric({"OutputBytes","PullServer output in bytes"})
- MutableCounterLong shuffleOutputBytes;
- @Metric({"Failed","# of failed shuffle outputs"})
- MutableCounterInt shuffleOutputsFailed;
- @Metric({"Succeeded","# of succeeded shuffle outputs"})
- MutableCounterInt shuffleOutputsOK;
- @Metric({"Connections","# of current shuffle connections"})
- MutableGaugeInt shuffleConnections;
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- shuffleOutputsOK.incr();
- } else {
- shuffleOutputsFailed.incr();
- }
- shuffleConnections.decr();
- }
- }
-
- final ShuffleMetrics metrics;
-
- PullServerAuxService(MetricsSystem ms) {
- super("httpshuffle");
- metrics = ms.register(new ShuffleMetrics());
- }
-
- @SuppressWarnings("UnusedDeclaration")
- public PullServerAuxService() {
- this(DefaultMetricsSystem.instance());
- }
-
- /**
- * Serialize the shuffle port into a ByteBuffer for use later on.
- * @param port the port to be sent to the ApplciationMaster
- * @return the serialized form of the port.
- */
- public static ByteBuffer serializeMetaData(int port) throws IOException {
- //TODO these bytes should be versioned
- DataOutputBuffer port_dob = new DataOutputBuffer();
- port_dob.writeInt(port);
- return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
- }
-
- /**
- * A helper function to deserialize the metadata returned by PullServerAuxService.
- * @param meta the metadata returned by the PullServerAuxService
- * @return the port the PullServer Handler is listening on to serve shuffle data.
- */
- public static int deserializeMetaData(ByteBuffer meta) throws IOException {
- //TODO this should be returning a class not just an int
- DataInputByteBuffer in = new DataInputByteBuffer();
- in.reset(meta);
- return in.readInt();
- }
-
- @Override
- public void initializeApplication(ApplicationInitializationContext appInitContext) {
- // TODO these bytes should be versioned
- // TODO: Once SHuffle is out of NM, this can use MR APIs
- this.appId = appInitContext.getApplicationId();
- this.queryId = TajoIdUtils.parseQueryId(appId.toString());
- this.userName = appInitContext.getUser();
- userRsrc.put(this.appId.toString(), this.userName);
- }
-
- @Override
- public void stopApplication(ApplicationTerminationContext appStopContext) {
- userRsrc.remove(appStopContext.getApplicationId().toString());
- }
-
- @Override
- public synchronized void init(Configuration conf) {
- try {
- manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
- DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
- readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
- DEFAULT_SHUFFLE_READAHEAD_BYTES);
-
- ThreadFactory bossFactory = new ThreadFactoryBuilder()
- .setNameFormat("PullServerAuxService Netty Boss #%d")
- .build();
- ThreadFactory workerFactory = new ThreadFactoryBuilder()
- .setNameFormat("PullServerAuxService Netty Worker #%d")
- .build();
-
- selector = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory));
-
- localFS = new LocalFileSystem();
- super.init(new Configuration(conf));
- } catch (Throwable t) {
- LOG.error(t);
- }
- }
-
- // TODO change AbstractService to throw InterruptedException
- @Override
- public synchronized void start() {
- Configuration conf = getConfig();
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- try {
- pipelineFact = new HttpPipelineFactory(conf);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- bootstrap.setPipelineFactory(pipelineFact);
- port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
- ConfVars.PULLSERVER_PORT.defaultIntVal);
- Channel ch = bootstrap.bind(new InetSocketAddress(port));
- accepted.add(ch);
- port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
- conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
- pipelineFact.PullServer.setPort(port);
- LOG.info(getName() + " listening on port " + port);
- super.start();
-
- sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
- DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
- }
-
- public int getPort() {
- return port;
- }
-
- @Override
- public synchronized void stop() {
- try {
- accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.releaseExternalResources();
- pipelineFact.destroy();
-
- localFS.close();
- } catch (Throwable t) {
- LOG.error(t);
- } finally {
- super.stop();
- }
- }
-
- @Override
- public synchronized ByteBuffer getMetaData() {
- try {
- return serializeMetaData(port);
- } catch (IOException e) {
- LOG.error("Error during getMeta", e);
- // TODO add API to AuxiliaryServices to report failures
- return null;
- }
- }
-
- class HttpPipelineFactory implements ChannelPipelineFactory {
-
- final PullServer PullServer;
- private SSLFactory sslFactory;
-
- public HttpPipelineFactory(Configuration conf) throws Exception {
- PullServer = new PullServer(conf);
- if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
- ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
- sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
- sslFactory.init();
- }
- }
-
- public void destroy() {
- if (sslFactory != null) {
- sslFactory.destroy();
- }
- }
-
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- if (sslFactory != null) {
- pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
- }
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunking", new ChunkedWriteHandler());
- pipeline.addLast("shuffle", PullServer);
- return pipeline;
- // TODO factor security manager into pipeline
- // TODO factor out encode/decode to permit binary shuffle
- // TODO factor out decode of index to permit alt. models
- }
- }
-
- class PullServer extends SimpleChannelUpstreamHandler {
- private final Configuration conf;
- private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
- private int port;
-
- public PullServer(Configuration conf) {
- this.conf = conf;
- this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal);
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- private List<String> splitMaps(List<String> mapq) {
- if (null == mapq) {
- return null;
- }
- final List<String> ret = new ArrayList<String>();
- for (String s : mapq) {
- Collections.addAll(ret, s.split(","));
- }
- return ret;
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
-
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
- }
-
- // Parsing the URL into key-values
- final Map<String, List<String>> params =
- new QueryStringDecoder(request.getUri()).getParameters();
- final List<String> types = params.get("type");
- final List<String> taskIdList = params.get("ta");
- final List<String> subQueryIds = params.get("sid");
- final List<String> partitionIds = params.get("p");
-
- if (types == null || taskIdList == null || subQueryIds == null
- || partitionIds == null) {
- sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
- BAD_REQUEST);
- return;
- }
-
- if (types.size() != 1 || subQueryIds.size() != 1) {
- sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
- BAD_REQUEST);
- return;
- }
-
- final List<FileChunk> chunks = Lists.newArrayList();
-
- String repartitionType = types.get(0);
- String sid = subQueryIds.get(0);
- String partitionId = partitionIds.get(0);
- List<String> taskIds = splitMaps(taskIdList);
-
- // the working dir of tajo worker for each query
- String queryBaseDir = queryId + "/output" + "/";
-
- LOG.info("PullServer request param: repartitionType=" + repartitionType +
- ", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList);
-
- String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname);
- if (taskLocalDir == null ||
- taskLocalDir.equals("")) {
- LOG.error("Tajo local directory should be specified.");
- }
- LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir);
-
- // if a subquery requires a range partitioning
- if (repartitionType.equals("r")) {
- String ta = taskIds.get(0);
- Path path = localFS.makeQualified(
- lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/"
- + ta + "/output/", conf));
-
- String startKey = params.get("start").get(0);
- String endKey = params.get("end").get(0);
- boolean last = params.get("final") != null;
-
- FileChunk chunk;
- try {
- chunk = getFileCunks(path, startKey, endKey, last);
- } catch (Throwable t) {
- LOG.error("ERROR Request: " + request.getUri(), t);
- sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
- return;
- }
- if (chunk != null) {
- chunks.add(chunk);
- }
-
- // if a subquery requires a hash repartition or a scattered hash repartition
- } else if (repartitionType.equals("h") || repartitionType.equals("s")) {
- for (String ta : taskIds) {
- Path path = localFS.makeQualified(
- lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
- ta + "/output/" + partitionId, conf));
- File file = new File(path.toUri());
- FileChunk chunk = new FileChunk(file, 0, file.length());
- chunks.add(chunk);
- }
- } else {
- LOG.error("Unknown repartition type: " + repartitionType);
- return;
- }
-
- // Write the content.
- Channel ch = e.getChannel();
- if (chunks.size() == 0) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
- ch.write(response);
- if (!isKeepAlive(request)) {
- ch.close();
- }
- } else {
- FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- long totalSize = 0;
- for (FileChunk chunk : file) {
- totalSize += chunk.length();
- }
- setContentLength(response, totalSize);
-
- // Write the initial line and the header.
- ch.write(response);
-
- ChannelFuture writeFuture = null;
-
- for (FileChunk chunk : file) {
- writeFuture = sendFile(ctx, ch, chunk);
- if (writeFuture == null) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- }
-
- // Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
- // Close the connection when the whole content is written out.
- writeFuture.addListener(ChannelFutureListener.CLOSE);
- }
- }
- }
-
- private ChannelFuture sendFile(ChannelHandlerContext ctx,
- Channel ch,
- FileChunk file) throws IOException {
- RandomAccessFile spill;
- try {
- spill = new RandomAccessFile(file.getFile(), "r");
- } catch (FileNotFoundException e) {
- LOG.info(file.getFile() + " not found");
- return null;
- }
- ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) == null) {
- final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
- file.startOffset(), file.length(), manageOsCache, readaheadLength,
- readaheadPool, file.getFile().getAbsolutePath());
- writeFuture = ch.write(partition);
- writeFuture.addListener(new FileCloseListener(partition, null, 0, null));
- } else {
- // HTTPS cannot be done with zero copy.
- final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
- file.startOffset(), file.length(), sslFileBufferSize,
- manageOsCache, readaheadLength, readaheadPool,
- file.getFile().getAbsolutePath());
- writeFuture = ch.write(chunk);
- }
- metrics.shuffleConnections.incr();
- metrics.shuffleOutputBytes.incr(file.length()); // optimistic
- return writeFuture;
- }
-
- private void sendError(ChannelHandlerContext ctx,
- HttpResponseStatus status) {
- sendError(ctx, "", status);
- }
-
- private void sendError(ChannelHandlerContext ctx, String message,
- HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(
- ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
-
- // Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
- if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
- return;
- }
-
- LOG.error("PullServer error: ", cause);
- if (ch.isConnected()) {
- LOG.error("PullServer error " + e);
- sendError(ctx, INTERNAL_SERVER_ERROR);
- }
- }
- }
-
- public FileChunk getFileCunks(Path outDir,
- String startKey,
- String endKey,
- boolean last) throws IOException {
- BSTIndex index = new BSTIndex(new TajoConf());
- BSTIndex.BSTIndexReader idxReader =
- index.getIndexReader(new Path(outDir, "index"));
- idxReader.open();
- Schema keySchema = idxReader.getKeySchema();
- TupleComparator comparator = idxReader.getComparator();
-
- LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
- + idxReader.getLastKey());
-
- File data = new File(URI.create(outDir.toUri() + "/output"));
- byte [] startBytes = Base64.decodeBase64(startKey);
- byte [] endBytes = Base64.decodeBase64(endKey);
-
- RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
- Tuple start;
- Tuple end;
- try {
- start = decoder.toTuple(startBytes);
- } catch (Throwable t) {
- throw new IllegalArgumentException("StartKey: " + startKey
- + ", decoded byte size: " + startBytes.length, t);
- }
-
- try {
- end = decoder.toTuple(endBytes);
- } catch (Throwable t) {
- throw new IllegalArgumentException("EndKey: " + endKey
- + ", decoded byte size: " + endBytes.length, t);
- }
-
-
- if(!comparator.isAscendingFirstKey()) {
- Tuple tmpKey = start;
- start = end;
- end = tmpKey;
- }
-
- LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
- (last ? ", last=true" : "") + ")");
-
- if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
- LOG.info("There is no contents");
- return null;
- }
-
- if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
- comparator.compare(idxReader.getLastKey(), start) < 0) {
- LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
- "], but request start:" + start + ", end: " + end);
- return null;
- }
-
- long startOffset;
- long endOffset;
- try {
- startOffset = idxReader.find(start);
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- try {
- endOffset = idxReader.find(end);
- if (endOffset == -1) {
- endOffset = idxReader.find(end, true);
- }
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
-
- // if startOffset == -1 then case 2-1 or case 3
- if (startOffset == -1) { // this is a hack
- // if case 2-1 or case 3
- try {
- startOffset = idxReader.find(start, true);
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- }
-
- if (startOffset == -1) {
- throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
- "State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- }
-
- // if greater than indexed values
- if (last || (endOffset == -1
- && comparator.compare(idxReader.getLastKey(), end) < 0)) {
- endOffset = data.length();
- }
-
- FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
- LOG.info("Retrieve File Chunk: " + chunk);
- return chunk;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
deleted file mode 100644
index 564950f..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.commons.lang.reflect.MethodUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.nativeio.NativeIO;
-
-import java.io.FileDescriptor;
-import java.lang.reflect.Method;
-
-public class PullServerUtil {
- private static final Log LOG = LogFactory.getLog(PullServerUtil.class);
-
- private static boolean nativeIOPossible = false;
- private static Method posixFadviseIfPossible;
-
- static {
- if (NativeIO.isAvailable() && loadNativeIO()) {
- nativeIOPossible = true;
- } else {
- LOG.warn("Unable to load hadoop nativeIO");
- }
- }
-
- public static boolean isNativeIOPossible() {
- return nativeIOPossible;
- }
-
- /**
- * Call posix_fadvise on the given file descriptor. See the manpage
- * for this syscall for more information. On systems where this
- * call is not available, does nothing.
- */
- public static void posixFadviseIfPossible(String identifier, java.io.FileDescriptor fd,
- long offset, long len, int flags) {
- if (nativeIOPossible) {
- try {
- posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags);
- } catch (Throwable t) {
- nativeIOPossible = false;
- LOG.warn("Failed to manage OS cache for " + identifier, t);
- }
- }
- }
-
- /* load hadoop native method if possible */
- private static boolean loadNativeIO() {
- boolean loaded = true;
- if (nativeIOPossible) return loaded;
-
- Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE};
- try {
- Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]);
- Class posixClass;
- if (getCacheManipulator != null) {
- Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null);
- posixClass = posix.getClass();
- } else {
- posixClass = NativeIO.POSIX.class;
- }
- posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters);
- } catch (Throwable e) {
- loaded = false;
- LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage());
- }
-
- if (posixFadviseIfPossible == null) {
- loaded = false;
- }
- return loaded;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
deleted file mode 100644
index d030eed..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.pullserver.PullServerAuxService.PullServer;
-import org.apache.tajo.util.StringUtils;
-
-public class TajoPullServer extends CompositeService {
- private static final Log LOG = LogFactory.getLog(TajoPullServer.class);
-
- private TajoPullServerService pullService;
- private TajoConf systemConf;
-
- public TajoPullServer() {
- super(TajoPullServer.class.getName());
- }
-
- @Override
- public void init(Configuration conf) {
- this.systemConf = (TajoConf)conf;
- pullService = new TajoPullServerService();
- addService(pullService);
-
- super.init(conf);
- }
-
- public void startPullServer(TajoConf systemConf) {
- init(systemConf);
- start();
- }
-
- public void start() {
- super.start();
-
- }
-
- public static void main(String[] args) throws Exception {
- StringUtils.startupShutdownMessage(PullServer.class, args, LOG);
-
- if (!TajoPullServerService.isStandalone()) {
- LOG.fatal("TAJO_PULLSERVER_STANDALONE env variable is not 'true'");
- return;
- }
-
- TajoConf tajoConf = new TajoConf();
- tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
-
- (new TajoPullServer()).startPullServer(tajoConf);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
deleted file mode 100644
index 5a4e69f..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ /dev/null
@@ -1,808 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.RpcChannelFactory;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
-
-import java.io.*;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class TajoPullServerService extends AbstractService {
-
- private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
-
- public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
- public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
- public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
- public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
- private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
- private HttpPipelineFactory pipelineFact;
- private int sslFileBufferSize;
-
- private ApplicationId appId;
- private FileSystem localFS;
-
- /**
- * Should the shuffle use posix_fadvise calls to manage the OS cache during
- * sendfile
- */
- private boolean manageOsCache;
- private int readaheadLength;
- private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
-
-
- public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
-
- private static final Map<String,String> userRsrc =
- new ConcurrentHashMap<String,String>();
- private String userName;
-
- public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
- "tajo.pullserver.ssl.file.buffer.size";
-
- public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
-
- private static boolean STANDALONE = false;
-
- static {
- String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
- if (!StringUtils.isEmpty(standalone)) {
- STANDALONE = standalone.equalsIgnoreCase("true");
- }
- }
-
- @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
- static class ShuffleMetrics implements ChannelFutureListener {
- @Metric({"OutputBytes","PullServer output in bytes"})
- MutableCounterLong shuffleOutputBytes;
- @Metric({"Failed","# of failed shuffle outputs"})
- MutableCounterInt shuffleOutputsFailed;
- @Metric({"Succeeded","# of succeeded shuffle outputs"})
- MutableCounterInt shuffleOutputsOK;
- @Metric({"Connections","# of current shuffle connections"})
- MutableGaugeInt shuffleConnections;
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- shuffleOutputsOK.incr();
- } else {
- shuffleOutputsFailed.incr();
- }
- shuffleConnections.decr();
- }
- }
-
- final ShuffleMetrics metrics;
-
- TajoPullServerService(MetricsSystem ms) {
- super("httpshuffle");
- metrics = ms.register(new ShuffleMetrics());
- }
-
- @SuppressWarnings("UnusedDeclaration")
- public TajoPullServerService() {
- this(DefaultMetricsSystem.instance());
- }
-
- /**
- * Serialize the shuffle port into a ByteBuffer for use later on.
- * @param port the port to be sent to the ApplciationMaster
- * @return the serialized form of the port.
- */
- public static ByteBuffer serializeMetaData(int port) throws IOException {
- //TODO these bytes should be versioned
- DataOutputBuffer port_dob = new DataOutputBuffer();
- port_dob.writeInt(port);
- return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
- }
-
- /**
- * A helper function to deserialize the metadata returned by PullServerAuxService.
- * @param meta the metadata returned by the PullServerAuxService
- * @return the port the PullServer Handler is listening on to serve shuffle data.
- */
- public static int deserializeMetaData(ByteBuffer meta) throws IOException {
- //TODO this should be returning a class not just an int
- DataInputByteBuffer in = new DataInputByteBuffer();
- in.reset(meta);
- return in.readInt();
- }
-
- public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
- // TODO these bytes should be versioned
- // TODO: Once SHuffle is out of NM, this can use MR APIs
- this.appId = appId;
- this.userName = user;
- userRsrc.put(appId.toString(), user);
- }
-
- public void stopApp(ApplicationId appId) {
- userRsrc.remove(appId.toString());
- }
-
- @Override
- public void init(Configuration conf) {
- try {
- manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
- DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
- readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
- DEFAULT_SHUFFLE_READAHEAD_BYTES);
-
- int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
- Runtime.getRuntime().availableProcessors() * 2);
-
- selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
-
- localFS = new LocalFileSystem();
-
- conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
- , TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal);
- super.init(conf);
- LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
- } catch (Throwable t) {
- LOG.error(t);
- }
- }
-
- // TODO change AbstractService to throw InterruptedException
- @Override
- public synchronized void serviceInit(Configuration conf) throws Exception {
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
-
- try {
- pipelineFact = new HttpPipelineFactory(conf);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- bootstrap.setPipelineFactory(pipelineFact);
-
- port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
- ConfVars.PULLSERVER_PORT.defaultIntVal);
- Channel ch = bootstrap.bind(new InetSocketAddress(port));
-
- accepted.add(ch);
- port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
- conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
- pipelineFact.PullServer.setPort(port);
- LOG.info(getName() + " listening on port " + port);
-
- sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
- DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
-
-
- if (STANDALONE) {
- File pullServerPortFile = getPullServerPortFile();
- if (pullServerPortFile.exists()) {
- pullServerPortFile.delete();
- }
- pullServerPortFile.getParentFile().mkdirs();
- LOG.info("Write PullServerPort to " + pullServerPortFile);
- FileOutputStream out = null;
- try {
- out = new FileOutputStream(pullServerPortFile);
- out.write(("" + port).getBytes());
- } catch (Exception e) {
- LOG.fatal("PullServer exists cause can't write PullServer port to " + pullServerPortFile +
- ", " + e.getMessage(), e);
- System.exit(-1);
- } finally {
- IOUtils.closeStream(out);
- }
- }
- super.serviceInit(conf);
- LOG.info("TajoPullServerService started: port=" + port);
- }
-
- public static boolean isStandalone() {
- return STANDALONE;
- }
-
- private static File getPullServerPortFile() {
- String pullServerPortInfoFile = System.getenv("TAJO_PID_DIR");
- if (StringUtils.isEmpty(pullServerPortInfoFile)) {
- pullServerPortInfoFile = "/tmp";
- }
- return new File(pullServerPortInfoFile + "/pullserver.port");
- }
-
- // TODO change to get port from master or tajoConf
- public static int readPullServerPort() {
- FileInputStream in = null;
- try {
- File pullServerPortFile = getPullServerPortFile();
-
- if (!pullServerPortFile.exists() || pullServerPortFile.isDirectory()) {
- return -1;
- }
- in = new FileInputStream(pullServerPortFile);
- byte[] buf = new byte[1024];
- int readBytes = in.read(buf);
- return Integer.parseInt(new String(buf, 0, readBytes));
- } catch (IOException e) {
- LOG.fatal(e.getMessage(), e);
- return -1;
- } finally {
- IOUtils.closeStream(in);
- }
- }
-
- public int getPort() {
- return port;
- }
-
- @Override
- public synchronized void stop() {
- try {
- accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.releaseExternalResources();
- pipelineFact.destroy();
-
- localFS.close();
- } catch (Throwable t) {
- LOG.error(t);
- } finally {
- super.stop();
- }
- }
-
- public synchronized ByteBuffer getMeta() {
- try {
- return serializeMetaData(port);
- } catch (IOException e) {
- LOG.error("Error during getMeta", e);
- // TODO add API to AuxiliaryServices to report failures
- return null;
- }
- }
-
- class HttpPipelineFactory implements ChannelPipelineFactory {
-
- final PullServer PullServer;
- private SSLFactory sslFactory;
-
- public HttpPipelineFactory(Configuration conf) throws Exception {
- PullServer = new PullServer(conf);
- if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
- ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
- sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
- sslFactory.init();
- }
- }
-
- public void destroy() {
- if (sslFactory != null) {
- sslFactory.destroy();
- }
- }
-
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- if (sslFactory != null) {
- pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
- }
-
- int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
- ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
- pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize));
- pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
- pipeline.addLast("chunking", new ChunkedWriteHandler());
- pipeline.addLast("shuffle", PullServer);
- return pipeline;
- // TODO factor security manager into pipeline
- // TODO factor out encode/decode to permit binary shuffle
- // TODO factor out decode of index to permit alt. models
- }
- }
-
-
- Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<String, ProcessingStatus>();
-
- public void completeFileChunk(FileRegion filePart,
- String requestUri,
- long startTime) {
- ProcessingStatus status = processingStatusMap.get(requestUri);
- if (status != null) {
- status.decrementRemainFiles(filePart, startTime);
- }
- }
-
- class ProcessingStatus {
- String requestUri;
- int numFiles;
- AtomicInteger remainFiles;
- long startTime;
- long makeFileListTime;
- long minTime = Long.MAX_VALUE;
- long maxTime;
- int numSlowFile;
-
- public ProcessingStatus(String requestUri) {
- this.requestUri = requestUri;
- this.startTime = System.currentTimeMillis();
- }
-
- public void setNumFiles(int numFiles) {
- this.numFiles = numFiles;
- this.remainFiles = new AtomicInteger(numFiles);
- }
- public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
- synchronized(remainFiles) {
- long fileSendTime = System.currentTimeMillis() - fileStartTime;
- if (fileSendTime > 20 * 1000) {
- LOG.info("PullServer send too long time: filePos=" + filePart.getPosition() + ", fileLen=" + filePart.getCount());
- numSlowFile++;
- }
- if (fileSendTime > maxTime) {
- maxTime = fileSendTime;
- }
- if (fileSendTime < minTime) {
- minTime = fileSendTime;
- }
- int remain = remainFiles.decrementAndGet();
- if (remain <= 0) {
- processingStatusMap.remove(requestUri);
- LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " +
- "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " +
- "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
- }
- }
- }
- }
-
- class PullServer extends SimpleChannelUpstreamHandler {
-
- private final Configuration conf;
-// private final IndexCache indexCache;
- private final LocalDirAllocator lDirAlloc =
- new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
- private int port;
-
- public PullServer(Configuration conf) throws IOException {
- this.conf = conf;
-// indexCache = new IndexCache(new JobConf(conf));
- this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
- ConfVars.PULLSERVER_PORT.defaultIntVal);
-
- // init local temporal dir
- lDirAlloc.getAllLocalPathsToRead(".", conf);
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- private List<String> splitMaps(List<String> mapq) {
- if (null == mapq) {
- return null;
- }
- final List<String> ret = new ArrayList<String>();
- for (String s : mapq) {
- Collections.addAll(ret, s.split(","));
- }
- return ret;
- }
-
- @Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
- throws Exception {
-
- accepted.add(evt.getChannel());
- LOG.info(String.format("Current number of shuffle connections (%d)", accepted.size()));
- super.channelOpen(ctx, evt);
-
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
-
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
- }
-
- ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
- processingStatusMap.put(request.getUri().toString(), processingStatus);
- // Parsing the URL into key-values
- final Map<String, List<String>> params =
- new QueryStringDecoder(request.getUri()).getParameters();
- final List<String> types = params.get("type");
- final List<String> qids = params.get("qid");
- final List<String> taskIdList = params.get("ta");
- final List<String> subQueryIds = params.get("sid");
- final List<String> partIds = params.get("p");
- final List<String> offsetList = params.get("offset");
- final List<String> lengthList = params.get("length");
-
- if (types == null || subQueryIds == null || qids == null || partIds == null) {
- sendError(ctx, "Required queryId, type, subquery Id, and part id",
- BAD_REQUEST);
- return;
- }
-
- if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
- sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id",
- BAD_REQUEST);
- return;
- }
-
- String partId = partIds.get(0);
- String queryId = qids.get(0);
- String shuffleType = types.get(0);
- String sid = subQueryIds.get(0);
-
- long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
- long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
- if (!shuffleType.equals("h") && !shuffleType.equals("s") && taskIdList == null) {
- sendError(ctx, "Required taskIds", BAD_REQUEST);
- }
-
- List<String> taskIds = splitMaps(taskIdList);
-
- String queryBaseDir = queryId.toString() + "/output";
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("PullServer request param: shuffleType=" + shuffleType +
- ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList);
-
- // the working dir of tajo worker for each query
- LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
- }
-
- final List<FileChunk> chunks = Lists.newArrayList();
-
- // if a subquery requires a range shuffle
- if (shuffleType.equals("r")) {
- String ta = taskIds.get(0);
- if(!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)){
- LOG.warn(e);
- sendError(ctx, NO_CONTENT);
- return;
- }
- Path path = localFS.makeQualified(
- lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf));
- String startKey = params.get("start").get(0);
- String endKey = params.get("end").get(0);
- boolean last = params.get("final") != null;
-
- FileChunk chunk;
- try {
- chunk = getFileCunks(path, startKey, endKey, last);
- } catch (Throwable t) {
- LOG.error("ERROR Request: " + request.getUri(), t);
- sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
- return;
- }
- if (chunk != null) {
- chunks.add(chunk);
- }
-
- // if a subquery requires a hash shuffle or a scattered hash shuffle
- } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
- int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
- String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
- if (!lDirAlloc.ifExists(partPath, conf)) {
- LOG.warn("Partition shuffle file not exists: " + partPath);
- sendError(ctx, NO_CONTENT);
- return;
- }
-
- Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath, conf));
-
- File file = new File(path.toUri());
- long startPos = (offset >= 0 && length >= 0) ? offset : 0;
- long readLen = (offset >= 0 && length >= 0) ? length : file.length();
-
- if (startPos >= file.length()) {
- String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
- LOG.error(errorMessage);
- sendError(ctx, errorMessage, BAD_REQUEST);
- return;
- }
- LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
- FileChunk chunk = new FileChunk(file, startPos, readLen);
- chunks.add(chunk);
- } else {
- LOG.error("Unknown shuffle type: " + shuffleType);
- sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST);
- return;
- }
-
- processingStatus.setNumFiles(chunks.size());
- processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime;
- // Write the content.
- Channel ch = e.getChannel();
- if (chunks.size() == 0) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
- ch.write(response);
- if (!isKeepAlive(request)) {
- ch.close();
- }
- } else {
- FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- long totalSize = 0;
- for (FileChunk chunk : file) {
- totalSize += chunk.length();
- }
- setContentLength(response, totalSize);
-
- // Write the initial line and the header.
- ch.write(response);
-
- ChannelFuture writeFuture = null;
-
- for (FileChunk chunk : file) {
- writeFuture = sendFile(ctx, ch, chunk, request.getUri().toString());
- if (writeFuture == null) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- }
-
- // Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
- // Close the connection when the whole content is written out.
- writeFuture.addListener(ChannelFutureListener.CLOSE);
- }
- }
- }
-
- private ChannelFuture sendFile(ChannelHandlerContext ctx,
- Channel ch,
- FileChunk file,
- String requestUri) throws IOException {
- long startTime = System.currentTimeMillis();
- RandomAccessFile spill = null;
- ChannelFuture writeFuture;
- try {
- spill = new RandomAccessFile(file.getFile(), "r");
- if (ch.getPipeline().get(SslHandler.class) == null) {
- final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
- file.startOffset(), file.length(), manageOsCache, readaheadLength,
- readaheadPool, file.getFile().getAbsolutePath());
- writeFuture = ch.write(filePart);
- writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
- } else {
- // HTTPS cannot be done with zero copy.
- final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
- file.startOffset(), file.length(), sslFileBufferSize,
- manageOsCache, readaheadLength, readaheadPool,
- file.getFile().getAbsolutePath());
- writeFuture = ch.write(chunk);
- }
- } catch (FileNotFoundException e) {
- LOG.info(file.getFile() + " not found");
- return null;
- } catch (Throwable e) {
- if (spill != null) {
- //should close a opening file
- spill.close();
- }
- return null;
- }
- metrics.shuffleConnections.incr();
- metrics.shuffleOutputBytes.incr(file.length()); // optimistic
- return writeFuture;
- }
-
- private void sendError(ChannelHandlerContext ctx,
- HttpResponseStatus status) {
- sendError(ctx, "", status);
- }
-
- private void sendError(ChannelHandlerContext ctx, String message,
- HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(
- ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
-
- // Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- LOG.error(e.getCause().getMessage(), e.getCause());
- //if channel.close() is not called, never closed files in this request
- if (ctx.getChannel().isConnected()){
- ctx.getChannel().close();
- }
- }
- }
-
- public static FileChunk getFileCunks(Path outDir,
- String startKey,
- String endKey,
- boolean last) throws IOException {
- BSTIndex index = new BSTIndex(new TajoConf());
- BSTIndex.BSTIndexReader idxReader =
- index.getIndexReader(new Path(outDir, "index"));
- idxReader.open();
- Schema keySchema = idxReader.getKeySchema();
- TupleComparator comparator = idxReader.getComparator();
-
- LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
- + idxReader.getLastKey());
-
- File data = new File(URI.create(outDir.toUri() + "/output"));
- byte [] startBytes = Base64.decodeBase64(startKey);
- byte [] endBytes = Base64.decodeBase64(endKey);
-
- RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
- Tuple start;
- Tuple end;
- try {
- start = decoder.toTuple(startBytes);
- } catch (Throwable t) {
- throw new IllegalArgumentException("StartKey: " + startKey
- + ", decoded byte size: " + startBytes.length, t);
- }
-
- try {
- end = decoder.toTuple(endBytes);
- } catch (Throwable t) {
- throw new IllegalArgumentException("EndKey: " + endKey
- + ", decoded byte size: " + endBytes.length, t);
- }
-
- LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
- (last ? ", last=true" : "") + ")");
-
- if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
- LOG.info("There is no contents");
- return null;
- }
-
- if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
- comparator.compare(idxReader.getLastKey(), start) < 0) {
- LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
- "], but request start:" + start + ", end: " + end);
- return null;
- }
-
- long startOffset;
- long endOffset;
- try {
- startOffset = idxReader.find(start);
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- try {
- endOffset = idxReader.find(end);
- if (endOffset == -1) {
- endOffset = idxReader.find(end, true);
- }
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
-
- // if startOffset == -1 then case 2-1 or case 3
- if (startOffset == -1) { // this is a hack
- // if case 2-1 or case 3
- try {
- startOffset = idxReader.find(start, true);
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- }
-
- if (startOffset == -1) {
- throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
- "State Dump (the requested range: "
- + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- }
-
- // if greater than indexed values
- if (last || (endOffset == -1
- && comparator.compare(idxReader.getLastKey(), end) < 0)) {
- endOffset = data.length();
- }
-
- idxReader.close();
-
- FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
- LOG.info("Retrieve File Chunk: " + chunk);
- return chunk;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
deleted file mode 100644
index 67e7423..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.retriever;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.pullserver.FileAccessForbiddenException;
-import org.apache.tajo.util.TajoIdUtils;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class AdvancedDataRetriever implements DataRetriever {
- private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
- private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
-
- public AdvancedDataRetriever() {
- }
-
- public void register(String taskAttemptId, RetrieverHandler handler) {
- synchronized (handlerMap) {
- if (!handlerMap.containsKey(taskAttemptId)) {
- handlerMap.put(taskAttemptId, handler);
- }
- }
- }
-
- public void unregister(String taskAttemptId) {
- synchronized (handlerMap) {
- if (handlerMap.containsKey(taskAttemptId)) {
- handlerMap.remove(taskAttemptId);
- }
- }
- }
-
- @Override
- public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
- throws IOException {
-
- final Map<String, List<String>> params =
- new QueryStringDecoder(request.getUri()).getParameters();
-
- if (!params.containsKey("qid")) {
- throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
- }
-
- if (params.containsKey("sid")) {
- List<FileChunk> chunks = Lists.newArrayList();
- List<String> queryUnidIds = splitMaps(params.get("qid"));
- for (String eachQueryUnitId : queryUnidIds) {
- String[] queryUnitIdSeqTokens = eachQueryUnitId.split("_");
- ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
- QueryUnitId quid = new QueryUnitId(ebId, Integer.parseInt(queryUnitIdSeqTokens[0]));
-
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid, Integer.parseInt(queryUnitIdSeqTokens[1]));
-
- RetrieverHandler handler = handlerMap.get(attemptId.toString());
- FileChunk chunk = handler.get(params);
- chunks.add(chunk);
- }
- return chunks.toArray(new FileChunk[chunks.size()]);
- } else {
- RetrieverHandler handler = handlerMap.get(params.get("qid").get(0));
- FileChunk chunk = handler.get(params);
- if (chunk == null) {
- if (params.containsKey("qid")) { // if there is no content corresponding to the query
- return null;
- } else { // if there is no
- throw new FileNotFoundException("No such a file corresponding to " + params.get("qid"));
- }
- }
-
- File file = chunk.getFile();
- if (file.isHidden() || !file.exists()) {
- throw new FileNotFoundException("No such file: " + file.getAbsolutePath());
- }
- if (!file.isFile()) {
- throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file");
- }
-
- return new FileChunk[] {chunk};
- }
- }
-
- private List<String> splitMaps(List<String> qids) {
- if (null == qids) {
- LOG.error("QueryUnitId is EMPTY");
- return null;
- }
-
- final List<String> ret = new ArrayList<String>();
- for (String qid : qids) {
- Collections.addAll(ret, qid.split(","));
- }
- return ret;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
deleted file mode 100644
index 8f55f7b..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-
-import java.io.IOException;
-
-public interface DataRetriever {
- FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
- throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
deleted file mode 100644
index dc63929..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.apache.tajo.pullserver.FileAccessForbiddenException;
-import org.apache.tajo.pullserver.HttpDataServerHandler;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-public class DirectoryRetriever implements DataRetriever {
- public String baseDir;
-
- public DirectoryRetriever(String baseDir) {
- this.baseDir = baseDir;
- }
-
- @Override
- public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
- throws IOException {
- final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
- if (path == null) {
- throw new IllegalArgumentException("Wrong path: " +path);
- }
-
- File file = new File(baseDir, path);
- if (file.isHidden() || !file.exists()) {
- throw new FileNotFoundException("No such file: " + baseDir + "/" + path);
- }
- if (!file.isFile()) {
- throw new FileAccessForbiddenException("No such file: "
- + baseDir + "/" + path);
- }
-
- return new FileChunk[] {new FileChunk(file, 0, file.length())};
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
deleted file mode 100644
index 67cff21..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.retriever;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-
-public class FileChunk {
- private final File file;
- private final long startOffset;
- private long length;
-
- /**
- * TRUE if this.file is created by getting data from a remote host (e.g., by HttpRequest). FALSE otherwise.
- */
- private boolean fromRemote;
-
- /**
- * ExecutionBlockId
- */
- private String ebId;
-
- public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
- this.file = file;
- this.startOffset = startOffset;
- this.length = length;
- }
-
- public File getFile() {
- return this.file;
- }
-
- public long startOffset() {
- return this.startOffset;
- }
-
- public long length() {
- return this.length;
- }
-
- public void setLength(long newLength) {
- this.length = newLength;
- }
-
- public boolean fromRemote() {
- return this.fromRemote;
- }
-
- public void setFromRemote(boolean newVal) {
- this.fromRemote = newVal;
- }
-
- public String getEbId() {
- return this.ebId;
- }
-
- public void setEbId(String newVal) {
- this.ebId = newVal;
- }
-
- public String toString() {
- return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") "
- + file.getAbsolutePath();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
deleted file mode 100644
index 5567c0d..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver.retriever;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public interface RetrieverHandler {
- /**
- *
- * @param kvs url-decoded key/value pairs
- * @return a desired part of a file
- * @throws java.io.IOException
- */
- public FileChunk get(Map<String, List<String>> kvs) throws IOException;
-}
[3/3] tajo git commit: TAJO-1229: rename tajo-yarn-pullserver to
tajo-pullserver.
Posted by hy...@apache.org.
TAJO-1229: rename tajo-yarn-pullserver to tajo-pullserver.
Closes #284
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b5aa7804
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b5aa7804
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b5aa7804
Branch: refs/heads/master
Commit: b5aa780460fcfbf657541ee6c94d41b34b1b24b9
Parents: facd1dd
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Dec 8 17:27:16 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Dec 8 17:27:16 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 +
pom.xml | 2 +-
tajo-pullserver/pom.xml | 146 ++++
.../tajo/pullserver/FadvisedChunkedFile.java | 81 ++
.../tajo/pullserver/FadvisedFileRegion.java | 170 ++++
.../FileAccessForbiddenException.java | 40 +
.../tajo/pullserver/FileCloseListener.java | 53 ++
.../tajo/pullserver/HttpDataServerHandler.java | 245 ++++++
.../HttpDataServerPipelineFactory.java | 56 ++
.../org/apache/tajo/pullserver/HttpUtil.java | 69 ++
.../tajo/pullserver/PullServerAuxService.java | 654 +++++++++++++++
.../apache/tajo/pullserver/PullServerUtil.java | 90 +++
.../apache/tajo/pullserver/TajoPullServer.java | 73 ++
.../tajo/pullserver/TajoPullServerService.java | 808 +++++++++++++++++++
.../retriever/AdvancedDataRetriever.java | 126 +++
.../pullserver/retriever/DataRetriever.java | 29 +
.../retriever/DirectoryRetriever.java | 56 ++
.../tajo/pullserver/retriever/FileChunk.java | 81 ++
.../pullserver/retriever/RetrieverHandler.java | 33 +
tajo-yarn-pullserver/pom.xml | 146 ----
.../tajo/pullserver/FadvisedChunkedFile.java | 81 --
.../tajo/pullserver/FadvisedFileRegion.java | 170 ----
.../FileAccessForbiddenException.java | 40 -
.../tajo/pullserver/FileCloseListener.java | 53 --
.../tajo/pullserver/HttpDataServerHandler.java | 245 ------
.../HttpDataServerPipelineFactory.java | 56 --
.../org/apache/tajo/pullserver/HttpUtil.java | 69 --
.../tajo/pullserver/PullServerAuxService.java | 654 ---------------
.../apache/tajo/pullserver/PullServerUtil.java | 90 ---
.../apache/tajo/pullserver/TajoPullServer.java | 73 --
.../tajo/pullserver/TajoPullServerService.java | 808 -------------------
.../retriever/AdvancedDataRetriever.java | 126 ---
.../pullserver/retriever/DataRetriever.java | 29 -
.../retriever/DirectoryRetriever.java | 56 --
.../tajo/pullserver/retriever/FileChunk.java | 81 --
.../pullserver/retriever/RetrieverHandler.java | 33 -
36 files changed, 2814 insertions(+), 2811 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index acc72b9..c84992b 100644
--- a/CHANGES
+++ b/CHANGES
@@ -154,6 +154,9 @@ Release 0.9.1 - unreleased
TASKS
+ TAJO-1229: rename tajo-yarn-pullserver to tajo-pullserver.
+ (hyunsik)
+
TAJO-1157: Required Java version in tutorial doc needs to be updated.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3dca9c0..62e03f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
<module>tajo-client</module>
<module>tajo-jdbc</module>
<module>tajo-storage</module>
- <module>tajo-yarn-pullserver</module>
+ <module>tajo-pullserver</module>
<module>tajo-dist</module>
<module>tajo-thirdparty/asm</module>
</modules>
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml
new file mode 100644
index 0000000..a7644a1
--- /dev/null
+++ b/tajo-pullserver/pom.xml
@@ -0,0 +1,146 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>tajo-project</artifactId>
+ <groupId>org.apache.tajo</groupId>
+ <version>0.9.1-SNAPSHOT</version>
+ <relativePath>../tajo-project</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <name>Tajo Core PullServer</name>
+ <artifactId>tajo-yarn-pullserver</artifactId>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-rpc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-catalog-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>docs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <execution>
+ <!-- build javadoc jars per jar for publishing to maven -->
+ <id>module-javadocs</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <destDir>${project.build.directory}</destDir>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+
+</project>
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
new file mode 100644
index 0000000..b0b8d18
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+ private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final ReadaheadPool readaheadPool;
+ private final FileDescriptor fd;
+ private final String identifier;
+
+ private ReadaheadPool.ReadaheadRequest readaheadRequest;
+
+ public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+ int chunkSize, boolean manageOsCache, int readaheadLength,
+ ReadaheadPool readaheadPool, String identifier) throws IOException {
+ super(file, position, count, chunkSize);
+ this.manageOsCache = manageOsCache;
+ this.readaheadLength = readaheadLength;
+ this.readaheadPool = readaheadPool;
+ this.fd = file.getFD();
+ this.identifier = identifier;
+ }
+
+ @Override
+ public Object nextChunk() throws Exception {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+ readaheadRequest = readaheadPool
+ .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+ getEndOffset(), readaheadRequest);
+ }
+ return super.nextChunk();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
+ try {
+ PullServerUtil.posixFadviseIfPossible(identifier,
+ fd,
+ getStartOffset(), getEndOffset() - getStartOffset(),
+ NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ } catch (Throwable t) {
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
new file mode 100644
index 0000000..18cf4b6
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.channel.DefaultFileRegion;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+ private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final ReadaheadPool readaheadPool;
+ private final FileDescriptor fd;
+ private final String identifier;
+ private final long count;
+ private final long position;
+ private final int shuffleBufferSize;
+ private final boolean shuffleTransferToAllowed;
+ private final FileChannel fileChannel;
+
+ private ReadaheadPool.ReadaheadRequest readaheadRequest;
+ public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+
+ public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+ boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+ String identifier) throws IOException {
+ this(file, position, count, manageOsCache, readaheadLength, readaheadPool,
+ identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true);
+ }
+
+ public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+ boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+ String identifier, int shuffleBufferSize,
+ boolean shuffleTransferToAllowed) throws IOException {
+ super(file.getChannel(), position, count);
+ this.manageOsCache = manageOsCache;
+ this.readaheadLength = readaheadLength;
+ this.readaheadPool = readaheadPool;
+ this.fd = file.getFD();
+ this.identifier = identifier;
+ this.fileChannel = file.getChannel();
+ this.count = count;
+ this.position = position;
+ this.shuffleBufferSize = shuffleBufferSize;
+ this.shuffleTransferToAllowed = shuffleTransferToAllowed;
+ }
+
+ @Override
+ public long transferTo(WritableByteChannel target, long position)
+ throws IOException {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+ readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+ getPosition() + position, readaheadLength,
+ getPosition() + getCount(), readaheadRequest);
+ }
+
+ if(this.shuffleTransferToAllowed) {
+ return super.transferTo(target, position);
+ } else {
+ return customShuffleTransfer(target, position);
+ }
+ }
+
+ /**
+ * This method transfers data using local buffer. It transfers data from
+ * a disk to a local buffer in memory, and then it transfers data from the
+ * buffer to the target. This is used only if transferTo is disallowed in
+ * the configuration file. super.TransferTo does not perform well on Windows
+ * due to a small IO request generated. customShuffleTransfer can control
+ * the size of the IO requests by changing the size of the intermediate
+ * buffer.
+ */
+ @VisibleForTesting
+ long customShuffleTransfer(WritableByteChannel target, long position)
+ throws IOException {
+ long actualCount = this.count - position;
+ if (actualCount < 0 || position < 0) {
+ throw new IllegalArgumentException(
+ "position out of range: " + position +
+ " (expected: 0 - " + (this.count - 1) + ')');
+ }
+ if (actualCount == 0) {
+ return 0L;
+ }
+
+ long trans = actualCount;
+ int readSize;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+
+ while(trans > 0L &&
+ (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+ //adjust counters and buffer limit
+ if(readSize < trans) {
+ trans -= readSize;
+ position += readSize;
+ byteBuffer.flip();
+ } else {
+ //We can read more than we need if the actualCount is not multiple
+ //of the byteBuffer size and file is big enough. In that case we cannot
+ //use flip method but we need to set buffer limit manually to trans.
+ byteBuffer.limit((int)trans);
+ byteBuffer.position(0);
+ position += trans;
+ trans = 0;
+ }
+
+ //write data to the target
+ while(byteBuffer.hasRemaining()) {
+ target.write(byteBuffer);
+ }
+
+ byteBuffer.clear();
+ }
+
+ return actualCount - trans;
+ }
+
+
+ @Override
+ public void releaseExternalResources() {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+ super.releaseExternalResources();
+ }
+
+ /**
+ * Call when the transfer completes successfully so we can advise the OS that
+ * we don't need the region to be cached anymore.
+ */
+ public void transferSuccessful() {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) {
+ try {
+ PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(),
+ NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ } catch (Throwable t) {
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
new file mode 100644
index 0000000..c703f6f
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import java.io.IOException;
+
+public class FileAccessForbiddenException extends IOException {
+ private static final long serialVersionUID = -3383272565826389213L;
+
+ public FileAccessForbiddenException() {
+ }
+
+ public FileAccessForbiddenException(String message) {
+ super(message);
+ }
+
+ public FileAccessForbiddenException(Throwable cause) {
+ super(cause);
+ }
+
+ public FileAccessForbiddenException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
new file mode 100644
index 0000000..236db89
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+public class FileCloseListener implements ChannelFutureListener {
+
+ private FadvisedFileRegion filePart;
+ private String requestUri;
+ private TajoPullServerService pullServerService;
+ private long startTime;
+
+ public FileCloseListener(FadvisedFileRegion filePart,
+ String requestUri,
+ long startTime,
+ TajoPullServerService pullServerService) {
+ this.filePart = filePart;
+ this.requestUri = requestUri;
+ this.pullServerService = pullServerService;
+ this.startTime = startTime;
+ }
+
+ // TODO error handling; distinguish IO/connection failures,
+ // attribute to appropriate spill output
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if(future.isSuccess()){
+ filePart.transferSuccessful();
+ }
+ filePart.releaseExternalResources();
+ if (pullServerService != null) {
+ pullServerService.completeFileChunk(filePart, requestUri, startTime);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
new file mode 100644
index 0000000..31db15c
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.pullserver.retriever.DataRetriever;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.*;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedFile;
+import org.jboss.netty.util.CharsetUtil;
+
+import java.io.*;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
+ private final static Log LOG = LogFactory.getLog(HttpDataServerHandler.class);
+
+ Map<ExecutionBlockId, DataRetriever> retrievers =
+ new ConcurrentHashMap<ExecutionBlockId, DataRetriever>();
+ private String userName;
+ private String appId;
+
+ public HttpDataServerHandler(String userName, String appId) {
+ this.userName= userName;
+ this.appId = appId;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ HttpRequest request = (HttpRequest) e.getMessage();
+ if (request.getMethod() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+
+ String base =
+ ContainerLocalizer.USERCACHE + "/" + userName + "/"
+ + ContainerLocalizer.APPCACHE + "/"
+ + appId + "/output" + "/";
+
+ final Map<String, List<String>> params =
+ new QueryStringDecoder(request.getUri()).getParameters();
+
+ List<FileChunk> chunks = Lists.newArrayList();
+ List<String> taskIds = splitMaps(params.get("ta"));
+ int sid = Integer.valueOf(params.get("sid").get(0));
+ int partitionId = Integer.valueOf(params.get("p").get(0));
+ for (String ta : taskIds) {
+
+ File file = new File(base + "/" + sid + "/" + ta + "/output/" + partitionId);
+ FileChunk chunk = new FileChunk(file, 0, file.length());
+ chunks.add(chunk);
+ }
+
+ FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
+// try {
+// file = retriever.handle(ctx, request);
+// } catch (FileNotFoundException fnf) {
+// LOG.error(fnf);
+// sendError(ctx, NOT_FOUND);
+// return;
+// } catch (IllegalArgumentException iae) {
+// LOG.error(iae);
+// sendError(ctx, BAD_REQUEST);
+// return;
+// } catch (FileAccessForbiddenException fafe) {
+// LOG.error(fafe);
+// sendError(ctx, FORBIDDEN);
+// return;
+// } catch (IOException ioe) {
+// LOG.error(ioe);
+// sendError(ctx, INTERNAL_SERVER_ERROR);
+// return;
+// }
+
+ // Write the content.
+ Channel ch = e.getChannel();
+ if (file == null) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
+ ch.write(response);
+ if (!isKeepAlive(request)) {
+ ch.close();
+ }
+ } else {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ long totalSize = 0;
+ for (FileChunk chunk : file) {
+ totalSize += chunk.length();
+ }
+ setContentLength(response, totalSize);
+
+ // Write the initial line and the header.
+ ch.write(response);
+
+ ChannelFuture writeFuture = null;
+
+ for (FileChunk chunk : file) {
+ writeFuture = sendFile(ctx, ch, chunk);
+ if (writeFuture == null) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ }
+
+ // Decide whether to close the connection or not.
+ if (!isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ writeFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+ }
+
+ private ChannelFuture sendFile(ChannelHandlerContext ctx,
+ Channel ch,
+ FileChunk file) throws IOException {
+ RandomAccessFile raf;
+ try {
+ raf = new RandomAccessFile(file.getFile(), "r");
+ } catch (FileNotFoundException fnfe) {
+ return null;
+ }
+
+ ChannelFuture writeFuture;
+ if (ch.getPipeline().get(SslHandler.class) != null) {
+ // Cannot use zero-copy with HTTPS.
+ writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(),
+ file.length(), 8192));
+ } else {
+ // No encryption - use zero-copy.
+ final FileRegion region = new DefaultFileRegion(raf.getChannel(),
+ file.startOffset(), file.length());
+ writeFuture = ch.write(region);
+ writeFuture.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) {
+ region.releaseExternalResources();
+ }
+ });
+ }
+
+ return writeFuture;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ Channel ch = e.getChannel();
+ Throwable cause = e.getCause();
+ if (cause instanceof TooLongFrameException) {
+ sendError(ctx, BAD_REQUEST);
+ return;
+ }
+
+ cause.printStackTrace();
+ if (ch.isConnected()) {
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ public static String sanitizeUri(String uri) {
+ // Decode the path.
+ try {
+ uri = URLDecoder.decode(uri, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ try {
+ uri = URLDecoder.decode(uri, "ISO-8859-1");
+ } catch (UnsupportedEncodingException e1) {
+ throw new Error();
+ }
+ }
+
+ // Convert file separators.
+ uri = uri.replace('/', File.separatorChar);
+
+ // Simplistic dumb security check.
+ // You will have to do something serious in the production environment.
+ if (uri.contains(File.separator + ".")
+ || uri.contains("." + File.separator) || uri.startsWith(".")
+ || uri.endsWith(".")) {
+ return null;
+ }
+
+ // Convert to absolute path.
+ return uri;
+ }
+
+ private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ response.setContent(ChannelBuffers.copiedBuffer(
+ "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
+
+ // Close the connection as soon as the error message is sent.
+ ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ private List<String> splitMaps(List<String> qids) {
+ if (null == qids) {
+ LOG.error("QueryUnitId is EMPTY");
+ return null;
+ }
+
+ final List<String> ret = new ArrayList<String>();
+ for (String qid : qids) {
+ Collections.addAll(ret, qid.split(","));
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
new file mode 100644
index 0000000..4c8bd8b
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.http.HttpContentCompressor;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
+ private String userName;
+ private String appId;
+ public HttpDataServerPipelineFactory(String userName, String appId) {
+ this.userName = userName;
+ this.appId = appId;
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = pipeline();
+
+ // Uncomment the following line if you want HTTPS
+ // SSLEngine engine =
+ // SecureChatSslContextFactory.getServerContext().createSSLEngine();
+ // engine.setUseClientMode(false);
+ // pipeline.addLast("ssl", new SslHandler(engine));
+
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+ pipeline.addLast("deflater", new HttpContentCompressor());
+ pipeline.addLast("handler", new HttpDataServerHandler(userName, appId));
+ return pipeline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
new file mode 100644
index 0000000..2cbb101
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.collect.Maps;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.util.Map;
+
+public class HttpUtil {
+ public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException {
+ return getParamsFromQuery(uri.getQuery());
+ }
+
+ /**
+ * It parses a query string into key/value pairs
+ *
+ * @param queryString decoded query string
+ * @return key/value pairs parsed from a given query string
+ * @throws java.io.UnsupportedEncodingException
+ */
+ public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException {
+ String [] queries = queryString.split("&");
+
+ Map<String,String> params = Maps.newHashMap();
+ String [] param;
+ for (String q : queries) {
+ param = q.split("=");
+ params.put(param[0], param[1]);
+ }
+
+ return params;
+ }
+
+ public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException {
+ StringBuilder sb = new StringBuilder();
+
+ boolean first = true;
+ for (Map.Entry<String,String> param : params.entrySet()) {
+ if (!first) {
+ sb.append("&");
+ }
+ sb.append(URLEncoder.encode(param.getKey(), "UTF-8")).
+ append("=").
+ append(URLEncoder.encode(param.getValue(), "UTF-8"));
+ first = false;
+ }
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
new file mode 100644
index 0000000..1c63c8a
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -0,0 +1,654 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.util.TajoIdUtils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.*;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class PullServerAuxService extends AuxiliaryService {
+
+ private static final Log LOG = LogFactory.getLog(PullServerAuxService.class);
+
+ public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
+ public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+ public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
+ public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
+
+ private int port;
+ private ChannelFactory selector;
+ private final ChannelGroup accepted = new DefaultChannelGroup();
+ private HttpPipelineFactory pipelineFact;
+ private int sslFileBufferSize;
+
+ private ApplicationId appId;
+ private QueryId queryId;
+ private FileSystem localFS;
+
+ /**
+ * Should the shuffle use posix_fadvise calls to manage the OS cache during
+ * sendfile
+ */
+ private boolean manageOsCache;
+ private int readaheadLength;
+ private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+
+
+ public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
+
+ private static final Map<String,String> userRsrc =
+ new ConcurrentHashMap<String,String>();
+ private static String userName;
+
+ public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+ "tajo.pullserver.ssl.file.buffer.size";
+
+ public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
+ @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
+ static class ShuffleMetrics implements ChannelFutureListener {
+ @Metric({"OutputBytes","PullServer output in bytes"})
+ MutableCounterLong shuffleOutputBytes;
+ @Metric({"Failed","# of failed shuffle outputs"})
+ MutableCounterInt shuffleOutputsFailed;
+ @Metric({"Succeeded","# of succeeded shuffle outputs"})
+ MutableCounterInt shuffleOutputsOK;
+ @Metric({"Connections","# of current shuffle connections"})
+ MutableGaugeInt shuffleConnections;
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ shuffleOutputsOK.incr();
+ } else {
+ shuffleOutputsFailed.incr();
+ }
+ shuffleConnections.decr();
+ }
+ }
+
+ final ShuffleMetrics metrics;
+
+ PullServerAuxService(MetricsSystem ms) {
+ super("httpshuffle");
+ metrics = ms.register(new ShuffleMetrics());
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public PullServerAuxService() {
+ this(DefaultMetricsSystem.instance());
+ }
+
+ /**
+ * Serialize the shuffle port into a ByteBuffer for use later on.
+ * @param port the port to be sent to the ApplciationMaster
+ * @return the serialized form of the port.
+ */
+ public static ByteBuffer serializeMetaData(int port) throws IOException {
+ //TODO these bytes should be versioned
+ DataOutputBuffer port_dob = new DataOutputBuffer();
+ port_dob.writeInt(port);
+ return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+ }
+
+ /**
+ * A helper function to deserialize the metadata returned by PullServerAuxService.
+ * @param meta the metadata returned by the PullServerAuxService
+ * @return the port the PullServer Handler is listening on to serve shuffle data.
+ */
+ public static int deserializeMetaData(ByteBuffer meta) throws IOException {
+ //TODO this should be returning a class not just an int
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(meta);
+ return in.readInt();
+ }
+
+ @Override
+ public void initializeApplication(ApplicationInitializationContext appInitContext) {
+ // TODO these bytes should be versioned
+ // TODO: Once SHuffle is out of NM, this can use MR APIs
+ this.appId = appInitContext.getApplicationId();
+ this.queryId = TajoIdUtils.parseQueryId(appId.toString());
+ this.userName = appInitContext.getUser();
+ userRsrc.put(this.appId.toString(), this.userName);
+ }
+
+ @Override
+ public void stopApplication(ApplicationTerminationContext appStopContext) {
+ userRsrc.remove(appStopContext.getApplicationId().toString());
+ }
+
+ @Override
+ public synchronized void init(Configuration conf) {
+ try {
+ manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+ DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+ readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+ DEFAULT_SHUFFLE_READAHEAD_BYTES);
+
+ ThreadFactory bossFactory = new ThreadFactoryBuilder()
+ .setNameFormat("PullServerAuxService Netty Boss #%d")
+ .build();
+ ThreadFactory workerFactory = new ThreadFactoryBuilder()
+ .setNameFormat("PullServerAuxService Netty Worker #%d")
+ .build();
+
+ selector = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(bossFactory),
+ Executors.newCachedThreadPool(workerFactory));
+
+ localFS = new LocalFileSystem();
+ super.init(new Configuration(conf));
+ } catch (Throwable t) {
+ LOG.error(t);
+ }
+ }
+
+ // TODO change AbstractService to throw InterruptedException
+ @Override
+ public synchronized void start() {
+ Configuration conf = getConfig();
+ ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ try {
+ pipelineFact = new HttpPipelineFactory(conf);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ bootstrap.setPipelineFactory(pipelineFact);
+ port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
+ ConfVars.PULLSERVER_PORT.defaultIntVal);
+ Channel ch = bootstrap.bind(new InetSocketAddress(port));
+ accepted.add(ch);
+ port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+ conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
+ pipelineFact.PullServer.setPort(port);
+ LOG.info(getName() + " listening on port " + port);
+ super.start();
+
+ sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+ DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public synchronized void stop() {
+ try {
+ accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ bootstrap.releaseExternalResources();
+ pipelineFact.destroy();
+
+ localFS.close();
+ } catch (Throwable t) {
+ LOG.error(t);
+ } finally {
+ super.stop();
+ }
+ }
+
+ @Override
+ public synchronized ByteBuffer getMetaData() {
+ try {
+ return serializeMetaData(port);
+ } catch (IOException e) {
+ LOG.error("Error during getMeta", e);
+ // TODO add API to AuxiliaryServices to report failures
+ return null;
+ }
+ }
+
+ class HttpPipelineFactory implements ChannelPipelineFactory {
+
+ final PullServer PullServer;
+ private SSLFactory sslFactory;
+
+ public HttpPipelineFactory(Configuration conf) throws Exception {
+ PullServer = new PullServer(conf);
+ if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
+ ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
+ sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+ sslFactory.init();
+ }
+ }
+
+ public void destroy() {
+ if (sslFactory != null) {
+ sslFactory.destroy();
+ }
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ if (sslFactory != null) {
+ pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+ }
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunking", new ChunkedWriteHandler());
+ pipeline.addLast("shuffle", PullServer);
+ return pipeline;
+ // TODO factor security manager into pipeline
+ // TODO factor out encode/decode to permit binary shuffle
+ // TODO factor out decode of index to permit alt. models
+ }
+ }
+
+ class PullServer extends SimpleChannelUpstreamHandler {
+ private final Configuration conf;
+ private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+ private int port;
+
+ public PullServer(Configuration conf) {
+ this.conf = conf;
+ this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal);
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ private List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<String>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+
+ HttpRequest request = (HttpRequest) e.getMessage();
+ if (request.getMethod() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+
+ // Parsing the URL into key-values
+ final Map<String, List<String>> params =
+ new QueryStringDecoder(request.getUri()).getParameters();
+ final List<String> types = params.get("type");
+ final List<String> taskIdList = params.get("ta");
+ final List<String> subQueryIds = params.get("sid");
+ final List<String> partitionIds = params.get("p");
+
+ if (types == null || taskIdList == null || subQueryIds == null
+ || partitionIds == null) {
+ sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
+ BAD_REQUEST);
+ return;
+ }
+
+ if (types.size() != 1 || subQueryIds.size() != 1) {
+ sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
+ BAD_REQUEST);
+ return;
+ }
+
+ final List<FileChunk> chunks = Lists.newArrayList();
+
+ String repartitionType = types.get(0);
+ String sid = subQueryIds.get(0);
+ String partitionId = partitionIds.get(0);
+ List<String> taskIds = splitMaps(taskIdList);
+
+ // the working dir of tajo worker for each query
+ String queryBaseDir = queryId + "/output" + "/";
+
+ LOG.info("PullServer request param: repartitionType=" + repartitionType +
+ ", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList);
+
+ String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname);
+ if (taskLocalDir == null ||
+ taskLocalDir.equals("")) {
+ LOG.error("Tajo local directory should be specified.");
+ }
+ LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir);
+
+ // if a subquery requires a range partitioning
+ if (repartitionType.equals("r")) {
+ String ta = taskIds.get(0);
+ Path path = localFS.makeQualified(
+ lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/"
+ + ta + "/output/", conf));
+
+ String startKey = params.get("start").get(0);
+ String endKey = params.get("end").get(0);
+ boolean last = params.get("final") != null;
+
+ FileChunk chunk;
+ try {
+ chunk = getFileCunks(path, startKey, endKey, last);
+ } catch (Throwable t) {
+ LOG.error("ERROR Request: " + request.getUri(), t);
+ sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
+ return;
+ }
+ if (chunk != null) {
+ chunks.add(chunk);
+ }
+
+ // if a subquery requires a hash repartition or a scattered hash repartition
+ } else if (repartitionType.equals("h") || repartitionType.equals("s")) {
+ for (String ta : taskIds) {
+ Path path = localFS.makeQualified(
+ lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
+ ta + "/output/" + partitionId, conf));
+ File file = new File(path.toUri());
+ FileChunk chunk = new FileChunk(file, 0, file.length());
+ chunks.add(chunk);
+ }
+ } else {
+ LOG.error("Unknown repartition type: " + repartitionType);
+ return;
+ }
+
+ // Write the content.
+ Channel ch = e.getChannel();
+ if (chunks.size() == 0) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
+ ch.write(response);
+ if (!isKeepAlive(request)) {
+ ch.close();
+ }
+ } else {
+ FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ long totalSize = 0;
+ for (FileChunk chunk : file) {
+ totalSize += chunk.length();
+ }
+ setContentLength(response, totalSize);
+
+ // Write the initial line and the header.
+ ch.write(response);
+
+ ChannelFuture writeFuture = null;
+
+ for (FileChunk chunk : file) {
+ writeFuture = sendFile(ctx, ch, chunk);
+ if (writeFuture == null) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ }
+
+ // Decide whether to close the connection or not.
+ if (!isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ writeFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+ }
+
+ private ChannelFuture sendFile(ChannelHandlerContext ctx,
+ Channel ch,
+ FileChunk file) throws IOException {
+ RandomAccessFile spill;
+ try {
+ spill = new RandomAccessFile(file.getFile(), "r");
+ } catch (FileNotFoundException e) {
+ LOG.info(file.getFile() + " not found");
+ return null;
+ }
+ ChannelFuture writeFuture;
+ if (ch.getPipeline().get(SslHandler.class) == null) {
+ final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
+ file.startOffset(), file.length(), manageOsCache, readaheadLength,
+ readaheadPool, file.getFile().getAbsolutePath());
+ writeFuture = ch.write(partition);
+ writeFuture.addListener(new FileCloseListener(partition, null, 0, null));
+ } else {
+ // HTTPS cannot be done with zero copy.
+ final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+ file.startOffset(), file.length(), sslFileBufferSize,
+ manageOsCache, readaheadLength, readaheadPool,
+ file.getFile().getAbsolutePath());
+ writeFuture = ch.write(chunk);
+ }
+ metrics.shuffleConnections.incr();
+ metrics.shuffleOutputBytes.incr(file.length()); // optimistic
+ return writeFuture;
+ }
+
+ private void sendError(ChannelHandlerContext ctx,
+ HttpResponseStatus status) {
+ sendError(ctx, "", status);
+ }
+
+ private void sendError(ChannelHandlerContext ctx, String message,
+ HttpResponseStatus status) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ response.setContent(
+ ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+
+ // Close the connection as soon as the error message is sent.
+ ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ Channel ch = e.getChannel();
+ Throwable cause = e.getCause();
+ if (cause instanceof TooLongFrameException) {
+ sendError(ctx, BAD_REQUEST);
+ return;
+ }
+
+ LOG.error("PullServer error: ", cause);
+ if (ch.isConnected()) {
+ LOG.error("PullServer error " + e);
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ }
+ }
+ }
+
+ public FileChunk getFileCunks(Path outDir,
+ String startKey,
+ String endKey,
+ boolean last) throws IOException {
+ BSTIndex index = new BSTIndex(new TajoConf());
+ BSTIndex.BSTIndexReader idxReader =
+ index.getIndexReader(new Path(outDir, "index"));
+ idxReader.open();
+ Schema keySchema = idxReader.getKeySchema();
+ TupleComparator comparator = idxReader.getComparator();
+
+ LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
+ + idxReader.getLastKey());
+
+ File data = new File(URI.create(outDir.toUri() + "/output"));
+ byte [] startBytes = Base64.decodeBase64(startKey);
+ byte [] endBytes = Base64.decodeBase64(endKey);
+
+ RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+ Tuple start;
+ Tuple end;
+ try {
+ start = decoder.toTuple(startBytes);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException("StartKey: " + startKey
+ + ", decoded byte size: " + startBytes.length, t);
+ }
+
+ try {
+ end = decoder.toTuple(endBytes);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException("EndKey: " + endKey
+ + ", decoded byte size: " + endBytes.length, t);
+ }
+
+
+ if(!comparator.isAscendingFirstKey()) {
+ Tuple tmpKey = start;
+ start = end;
+ end = tmpKey;
+ }
+
+ LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
+ (last ? ", last=true" : "") + ")");
+
+ if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
+ LOG.info("There is no contents");
+ return null;
+ }
+
+ if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
+ comparator.compare(idxReader.getLastKey(), start) < 0) {
+ LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
+ "], but request start:" + start + ", end: " + end);
+ return null;
+ }
+
+ long startOffset;
+ long endOffset;
+ try {
+ startOffset = idxReader.find(start);
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end +")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ try {
+ endOffset = idxReader.find(end);
+ if (endOffset == -1) {
+ endOffset = idxReader.find(end, true);
+ }
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end +")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+
+ // if startOffset == -1 then case 2-1 or case 3
+ if (startOffset == -1) { // this is a hack
+ // if case 2-1 or case 3
+ try {
+ startOffset = idxReader.find(start, true);
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end +")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ }
+
+ if (startOffset == -1) {
+ throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+ "State Dump (the requested range: "
+ + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ }
+
+ // if greater than indexed values
+ if (last || (endOffset == -1
+ && comparator.compare(idxReader.getLastKey(), end) < 0)) {
+ endOffset = data.length();
+ }
+
+ FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+ LOG.info("Retrieve File Chunk: " + chunk);
+ return chunk;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
new file mode 100644
index 0000000..564950f
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.apache.commons.lang.reflect.MethodUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import java.io.FileDescriptor;
+import java.lang.reflect.Method;
+
+public class PullServerUtil {
+ private static final Log LOG = LogFactory.getLog(PullServerUtil.class);
+
+ private static boolean nativeIOPossible = false;
+ private static Method posixFadviseIfPossible;
+
+ static {
+ if (NativeIO.isAvailable() && loadNativeIO()) {
+ nativeIOPossible = true;
+ } else {
+ LOG.warn("Unable to load hadoop nativeIO");
+ }
+ }
+
+ public static boolean isNativeIOPossible() {
+ return nativeIOPossible;
+ }
+
+ /**
+ * Call posix_fadvise on the given file descriptor. See the manpage
+ * for this syscall for more information. On systems where this
+ * call is not available, does nothing.
+ */
+ public static void posixFadviseIfPossible(String identifier, java.io.FileDescriptor fd,
+ long offset, long len, int flags) {
+ if (nativeIOPossible) {
+ try {
+ posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags);
+ } catch (Throwable t) {
+ nativeIOPossible = false;
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ }
+
+ /* load hadoop native method if possible */
+ private static boolean loadNativeIO() {
+ boolean loaded = true;
+ if (nativeIOPossible) return loaded;
+
+ Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE};
+ try {
+ Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]);
+ Class posixClass;
+ if (getCacheManipulator != null) {
+ Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null);
+ posixClass = posix.getClass();
+ } else {
+ posixClass = NativeIO.POSIX.class;
+ }
+ posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters);
+ } catch (Throwable e) {
+ loaded = false;
+ LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage());
+ }
+
+ if (posixFadviseIfPossible == null) {
+ loaded = false;
+ }
+ return loaded;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
new file mode 100644
index 0000000..d030eed
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.PullServerAuxService.PullServer;
+import org.apache.tajo.util.StringUtils;
+
+public class TajoPullServer extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(TajoPullServer.class);
+
+ private TajoPullServerService pullService;
+ private TajoConf systemConf;
+
+ public TajoPullServer() {
+ super(TajoPullServer.class.getName());
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ this.systemConf = (TajoConf)conf;
+ pullService = new TajoPullServerService();
+ addService(pullService);
+
+ super.init(conf);
+ }
+
+ public void startPullServer(TajoConf systemConf) {
+ init(systemConf);
+ start();
+ }
+
+ public void start() {
+ super.start();
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ StringUtils.startupShutdownMessage(PullServer.class, args, LOG);
+
+ if (!TajoPullServerService.isStandalone()) {
+ LOG.fatal("TAJO_PULLSERVER_STANDALONE env variable is not 'true'");
+ return;
+ }
+
+ TajoConf tajoConf = new TajoConf();
+ tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
+
+ (new TajoPullServer()).startPullServer(tajoConf);
+ }
+}
[2/3] tajo git commit: TAJO-1229: rename tajo-yarn-pullserver to
tajo-pullserver.
Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
new file mode 100644
index 0000000..5a4e69f
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -0,0 +1,808 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.handler.codec.http.*;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class TajoPullServerService extends AbstractService {
+
+ private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
+
+ public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
+ public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+ public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
+ public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
+
+ private int port;
+ private ChannelFactory selector;
+ private final ChannelGroup accepted = new DefaultChannelGroup();
+ private HttpPipelineFactory pipelineFact;
+ private int sslFileBufferSize;
+
+ private ApplicationId appId;
+ private FileSystem localFS;
+
+ /**
+ * Should the shuffle use posix_fadvise calls to manage the OS cache during
+ * sendfile
+ */
+ private boolean manageOsCache;
+ private int readaheadLength;
+ private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+
+
+ public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
+
+ private static final Map<String,String> userRsrc =
+ new ConcurrentHashMap<String,String>();
+ private String userName;
+
+ public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+ "tajo.pullserver.ssl.file.buffer.size";
+
+ public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
+ private static boolean STANDALONE = false;
+
+ static {
+ String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
+ if (!StringUtils.isEmpty(standalone)) {
+ STANDALONE = standalone.equalsIgnoreCase("true");
+ }
+ }
+
+ @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
+ static class ShuffleMetrics implements ChannelFutureListener {
+ @Metric({"OutputBytes","PullServer output in bytes"})
+ MutableCounterLong shuffleOutputBytes;
+ @Metric({"Failed","# of failed shuffle outputs"})
+ MutableCounterInt shuffleOutputsFailed;
+ @Metric({"Succeeded","# of succeeded shuffle outputs"})
+ MutableCounterInt shuffleOutputsOK;
+ @Metric({"Connections","# of current shuffle connections"})
+ MutableGaugeInt shuffleConnections;
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ shuffleOutputsOK.incr();
+ } else {
+ shuffleOutputsFailed.incr();
+ }
+ shuffleConnections.decr();
+ }
+ }
+
+ final ShuffleMetrics metrics;
+
+ TajoPullServerService(MetricsSystem ms) {
+ super("httpshuffle");
+ metrics = ms.register(new ShuffleMetrics());
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public TajoPullServerService() {
+ this(DefaultMetricsSystem.instance());
+ }
+
+ /**
+ * Serialize the shuffle port into a ByteBuffer for use later on.
+ * @param port the port to be sent to the ApplciationMaster
+ * @return the serialized form of the port.
+ */
+ public static ByteBuffer serializeMetaData(int port) throws IOException {
+ //TODO these bytes should be versioned
+ DataOutputBuffer port_dob = new DataOutputBuffer();
+ port_dob.writeInt(port);
+ return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+ }
+
+ /**
+ * A helper function to deserialize the metadata returned by PullServerAuxService.
+ * @param meta the metadata returned by the PullServerAuxService
+ * @return the port the PullServer Handler is listening on to serve shuffle data.
+ */
+ public static int deserializeMetaData(ByteBuffer meta) throws IOException {
+ //TODO this should be returning a class not just an int
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(meta);
+ return in.readInt();
+ }
+
+ public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
+ // TODO these bytes should be versioned
+ // TODO: Once SHuffle is out of NM, this can use MR APIs
+ this.appId = appId;
+ this.userName = user;
+ userRsrc.put(appId.toString(), user);
+ }
+
+ public void stopApp(ApplicationId appId) {
+ userRsrc.remove(appId.toString());
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ try {
+ manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+ DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+ readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+ DEFAULT_SHUFFLE_READAHEAD_BYTES);
+
+ int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
+ Runtime.getRuntime().availableProcessors() * 2);
+
+ selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
+
+ localFS = new LocalFileSystem();
+
+ conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
+ , TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal);
+ super.init(conf);
+ LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
+ } catch (Throwable t) {
+ LOG.error(t);
+ }
+ }
+
+ // TODO change AbstractService to throw InterruptedException
+ @Override
+ public synchronized void serviceInit(Configuration conf) throws Exception {
+ ServerBootstrap bootstrap = new ServerBootstrap(selector);
+
+ try {
+ pipelineFact = new HttpPipelineFactory(conf);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ bootstrap.setPipelineFactory(pipelineFact);
+
+ port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
+ ConfVars.PULLSERVER_PORT.defaultIntVal);
+ Channel ch = bootstrap.bind(new InetSocketAddress(port));
+
+ accepted.add(ch);
+ port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+ conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
+ pipelineFact.PullServer.setPort(port);
+ LOG.info(getName() + " listening on port " + port);
+
+ sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+ DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+
+
+ if (STANDALONE) {
+ File pullServerPortFile = getPullServerPortFile();
+ if (pullServerPortFile.exists()) {
+ pullServerPortFile.delete();
+ }
+ pullServerPortFile.getParentFile().mkdirs();
+ LOG.info("Write PullServerPort to " + pullServerPortFile);
+ FileOutputStream out = null;
+ try {
+ out = new FileOutputStream(pullServerPortFile);
+ out.write(("" + port).getBytes());
+ } catch (Exception e) {
+ LOG.fatal("PullServer exists cause can't write PullServer port to " + pullServerPortFile +
+ ", " + e.getMessage(), e);
+ System.exit(-1);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+ super.serviceInit(conf);
+ LOG.info("TajoPullServerService started: port=" + port);
+ }
+
+ public static boolean isStandalone() {
+ return STANDALONE;
+ }
+
+ private static File getPullServerPortFile() {
+ String pullServerPortInfoFile = System.getenv("TAJO_PID_DIR");
+ if (StringUtils.isEmpty(pullServerPortInfoFile)) {
+ pullServerPortInfoFile = "/tmp";
+ }
+ return new File(pullServerPortInfoFile + "/pullserver.port");
+ }
+
+ // TODO change to get port from master or tajoConf
+ public static int readPullServerPort() {
+ FileInputStream in = null;
+ try {
+ File pullServerPortFile = getPullServerPortFile();
+
+ if (!pullServerPortFile.exists() || pullServerPortFile.isDirectory()) {
+ return -1;
+ }
+ in = new FileInputStream(pullServerPortFile);
+ byte[] buf = new byte[1024];
+ int readBytes = in.read(buf);
+ return Integer.parseInt(new String(buf, 0, readBytes));
+ } catch (IOException e) {
+ LOG.fatal(e.getMessage(), e);
+ return -1;
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public synchronized void stop() {
+ try {
+ accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ bootstrap.releaseExternalResources();
+ pipelineFact.destroy();
+
+ localFS.close();
+ } catch (Throwable t) {
+ LOG.error(t);
+ } finally {
+ super.stop();
+ }
+ }
+
+ public synchronized ByteBuffer getMeta() {
+ try {
+ return serializeMetaData(port);
+ } catch (IOException e) {
+ LOG.error("Error during getMeta", e);
+ // TODO add API to AuxiliaryServices to report failures
+ return null;
+ }
+ }
+
+ class HttpPipelineFactory implements ChannelPipelineFactory {
+
+ final PullServer PullServer;
+ private SSLFactory sslFactory;
+
+ public HttpPipelineFactory(Configuration conf) throws Exception {
+ PullServer = new PullServer(conf);
+ if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
+ ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
+ sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+ sslFactory.init();
+ }
+ }
+
+ public void destroy() {
+ if (sslFactory != null) {
+ sslFactory.destroy();
+ }
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ if (sslFactory != null) {
+ pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+ }
+
+ int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
+ ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
+ pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize));
+ pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+ pipeline.addLast("chunking", new ChunkedWriteHandler());
+ pipeline.addLast("shuffle", PullServer);
+ return pipeline;
+ // TODO factor security manager into pipeline
+ // TODO factor out encode/decode to permit binary shuffle
+ // TODO factor out decode of index to permit alt. models
+ }
+ }
+
+
+ Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<String, ProcessingStatus>();
+
+ public void completeFileChunk(FileRegion filePart,
+ String requestUri,
+ long startTime) {
+ ProcessingStatus status = processingStatusMap.get(requestUri);
+ if (status != null) {
+ status.decrementRemainFiles(filePart, startTime);
+ }
+ }
+
+ class ProcessingStatus {
+ String requestUri;
+ int numFiles;
+ AtomicInteger remainFiles;
+ long startTime;
+ long makeFileListTime;
+ long minTime = Long.MAX_VALUE;
+ long maxTime;
+ int numSlowFile;
+
+ public ProcessingStatus(String requestUri) {
+ this.requestUri = requestUri;
+ this.startTime = System.currentTimeMillis();
+ }
+
+ public void setNumFiles(int numFiles) {
+ this.numFiles = numFiles;
+ this.remainFiles = new AtomicInteger(numFiles);
+ }
+ public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
+ synchronized(remainFiles) {
+ long fileSendTime = System.currentTimeMillis() - fileStartTime;
+ if (fileSendTime > 20 * 1000) {
+ LOG.info("PullServer send too long time: filePos=" + filePart.getPosition() + ", fileLen=" + filePart.getCount());
+ numSlowFile++;
+ }
+ if (fileSendTime > maxTime) {
+ maxTime = fileSendTime;
+ }
+ if (fileSendTime < minTime) {
+ minTime = fileSendTime;
+ }
+ int remain = remainFiles.decrementAndGet();
+ if (remain <= 0) {
+ processingStatusMap.remove(requestUri);
+ LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " +
+ "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " +
+ "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
+ }
+ }
+ }
+ }
+
+ class PullServer extends SimpleChannelUpstreamHandler {
+
+ private final Configuration conf;
+// private final IndexCache indexCache;
+ private final LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+ private int port;
+
+ public PullServer(Configuration conf) throws IOException {
+ this.conf = conf;
+// indexCache = new IndexCache(new JobConf(conf));
+ this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
+ ConfVars.PULLSERVER_PORT.defaultIntVal);
+
+ // init local temporal dir
+ lDirAlloc.getAllLocalPathsToRead(".", conf);
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ private List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<String>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ throws Exception {
+
+ accepted.add(evt.getChannel());
+ LOG.info(String.format("Current number of shuffle connections (%d)", accepted.size()));
+ super.channelOpen(ctx, evt);
+
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+
+ HttpRequest request = (HttpRequest) e.getMessage();
+ if (request.getMethod() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+
+ ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
+ processingStatusMap.put(request.getUri().toString(), processingStatus);
+ // Parsing the URL into key-values
+ final Map<String, List<String>> params =
+ new QueryStringDecoder(request.getUri()).getParameters();
+ final List<String> types = params.get("type");
+ final List<String> qids = params.get("qid");
+ final List<String> taskIdList = params.get("ta");
+ final List<String> subQueryIds = params.get("sid");
+ final List<String> partIds = params.get("p");
+ final List<String> offsetList = params.get("offset");
+ final List<String> lengthList = params.get("length");
+
+ if (types == null || subQueryIds == null || qids == null || partIds == null) {
+ sendError(ctx, "Required queryId, type, subquery Id, and part id",
+ BAD_REQUEST);
+ return;
+ }
+
+ if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
+ sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id",
+ BAD_REQUEST);
+ return;
+ }
+
+ String partId = partIds.get(0);
+ String queryId = qids.get(0);
+ String shuffleType = types.get(0);
+ String sid = subQueryIds.get(0);
+
+ long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
+ long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
+
+ if (!shuffleType.equals("h") && !shuffleType.equals("s") && taskIdList == null) {
+ sendError(ctx, "Required taskIds", BAD_REQUEST);
+ }
+
+ List<String> taskIds = splitMaps(taskIdList);
+
+ String queryBaseDir = queryId.toString() + "/output";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PullServer request param: shuffleType=" + shuffleType +
+ ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList);
+
+ // the working dir of tajo worker for each query
+ LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+ }
+
+ final List<FileChunk> chunks = Lists.newArrayList();
+
+ // if a subquery requires a range shuffle
+ if (shuffleType.equals("r")) {
+ String ta = taskIds.get(0);
+ if(!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)){
+ LOG.warn(e);
+ sendError(ctx, NO_CONTENT);
+ return;
+ }
+ Path path = localFS.makeQualified(
+ lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf));
+ String startKey = params.get("start").get(0);
+ String endKey = params.get("end").get(0);
+ boolean last = params.get("final") != null;
+
+ FileChunk chunk;
+ try {
+ chunk = getFileCunks(path, startKey, endKey, last);
+ } catch (Throwable t) {
+ LOG.error("ERROR Request: " + request.getUri(), t);
+ sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
+ return;
+ }
+ if (chunk != null) {
+ chunks.add(chunk);
+ }
+
+ // if a subquery requires a hash shuffle or a scattered hash shuffle
+ } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
+ int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
+ String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
+ if (!lDirAlloc.ifExists(partPath, conf)) {
+ LOG.warn("Partition shuffle file not exists: " + partPath);
+ sendError(ctx, NO_CONTENT);
+ return;
+ }
+
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath, conf));
+
+ File file = new File(path.toUri());
+ long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+ long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+ if (startPos >= file.length()) {
+ String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
+ LOG.error(errorMessage);
+ sendError(ctx, errorMessage, BAD_REQUEST);
+ return;
+ }
+ LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
+ FileChunk chunk = new FileChunk(file, startPos, readLen);
+ chunks.add(chunk);
+ } else {
+ LOG.error("Unknown shuffle type: " + shuffleType);
+ sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST);
+ return;
+ }
+
+ processingStatus.setNumFiles(chunks.size());
+ processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime;
+ // Write the content.
+ Channel ch = e.getChannel();
+ if (chunks.size() == 0) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
+ ch.write(response);
+ if (!isKeepAlive(request)) {
+ ch.close();
+ }
+ } else {
+ FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ long totalSize = 0;
+ for (FileChunk chunk : file) {
+ totalSize += chunk.length();
+ }
+ setContentLength(response, totalSize);
+
+ // Write the initial line and the header.
+ ch.write(response);
+
+ ChannelFuture writeFuture = null;
+
+ for (FileChunk chunk : file) {
+ writeFuture = sendFile(ctx, ch, chunk, request.getUri().toString());
+ if (writeFuture == null) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ }
+
+ // Decide whether to close the connection or not.
+ if (!isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ writeFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+ }
+
+ private ChannelFuture sendFile(ChannelHandlerContext ctx,
+ Channel ch,
+ FileChunk file,
+ String requestUri) throws IOException {
+ long startTime = System.currentTimeMillis();
+ RandomAccessFile spill = null;
+ ChannelFuture writeFuture;
+ try {
+ spill = new RandomAccessFile(file.getFile(), "r");
+ if (ch.getPipeline().get(SslHandler.class) == null) {
+ final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
+ file.startOffset(), file.length(), manageOsCache, readaheadLength,
+ readaheadPool, file.getFile().getAbsolutePath());
+ writeFuture = ch.write(filePart);
+ writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
+ } else {
+ // HTTPS cannot be done with zero copy.
+ final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+ file.startOffset(), file.length(), sslFileBufferSize,
+ manageOsCache, readaheadLength, readaheadPool,
+ file.getFile().getAbsolutePath());
+ writeFuture = ch.write(chunk);
+ }
+ } catch (FileNotFoundException e) {
+ LOG.info(file.getFile() + " not found");
+ return null;
+ } catch (Throwable e) {
+ if (spill != null) {
+ //should close a opening file
+ spill.close();
+ }
+ return null;
+ }
+ metrics.shuffleConnections.incr();
+ metrics.shuffleOutputBytes.incr(file.length()); // optimistic
+ return writeFuture;
+ }
+
+ private void sendError(ChannelHandlerContext ctx,
+ HttpResponseStatus status) {
+ sendError(ctx, "", status);
+ }
+
+ private void sendError(ChannelHandlerContext ctx, String message,
+ HttpResponseStatus status) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ response.setContent(
+ ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+
+ // Close the connection as soon as the error message is sent.
+ ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ LOG.error(e.getCause().getMessage(), e.getCause());
+ //if channel.close() is not called, never closed files in this request
+ if (ctx.getChannel().isConnected()){
+ ctx.getChannel().close();
+ }
+ }
+ }
+
+ public static FileChunk getFileCunks(Path outDir,
+ String startKey,
+ String endKey,
+ boolean last) throws IOException {
+ BSTIndex index = new BSTIndex(new TajoConf());
+ BSTIndex.BSTIndexReader idxReader =
+ index.getIndexReader(new Path(outDir, "index"));
+ idxReader.open();
+ Schema keySchema = idxReader.getKeySchema();
+ TupleComparator comparator = idxReader.getComparator();
+
+ LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
+ + idxReader.getLastKey());
+
+ File data = new File(URI.create(outDir.toUri() + "/output"));
+ byte [] startBytes = Base64.decodeBase64(startKey);
+ byte [] endBytes = Base64.decodeBase64(endKey);
+
+ RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+ Tuple start;
+ Tuple end;
+ try {
+ start = decoder.toTuple(startBytes);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException("StartKey: " + startKey
+ + ", decoded byte size: " + startBytes.length, t);
+ }
+
+ try {
+ end = decoder.toTuple(endBytes);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException("EndKey: " + endKey
+ + ", decoded byte size: " + endBytes.length, t);
+ }
+
+ LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
+ (last ? ", last=true" : "") + ")");
+
+ if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
+ LOG.info("There is no contents");
+ return null;
+ }
+
+ if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
+ comparator.compare(idxReader.getLastKey(), start) < 0) {
+ LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
+ "], but request start:" + start + ", end: " + end);
+ return null;
+ }
+
+ long startOffset;
+ long endOffset;
+ try {
+ startOffset = idxReader.find(start);
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end +")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ try {
+ endOffset = idxReader.find(end);
+ if (endOffset == -1) {
+ endOffset = idxReader.find(end, true);
+ }
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end +")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+
+ // if startOffset == -1 then case 2-1 or case 3
+ if (startOffset == -1) { // this is a hack
+ // if case 2-1 or case 3
+ try {
+ startOffset = idxReader.find(start, true);
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end +")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ }
+
+ if (startOffset == -1) {
+ throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+ "State Dump (the requested range: "
+ + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ }
+
+ // if greater than indexed values
+ if (last || (endOffset == -1
+ && comparator.compare(idxReader.getLastKey(), end) < 0)) {
+ endOffset = data.length();
+ }
+
+ idxReader.close();
+
+ FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+ LOG.info("Retrieve File Chunk: " + chunk);
+ return chunk;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
new file mode 100644
index 0000000..67e7423
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.pullserver.FileAccessForbiddenException;
+import org.apache.tajo.util.TajoIdUtils;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class AdvancedDataRetriever implements DataRetriever {
+ private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
+ private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
+
+ public AdvancedDataRetriever() {
+ }
+
+ public void register(String taskAttemptId, RetrieverHandler handler) {
+ synchronized (handlerMap) {
+ if (!handlerMap.containsKey(taskAttemptId)) {
+ handlerMap.put(taskAttemptId, handler);
+ }
+ }
+ }
+
+ public void unregister(String taskAttemptId) {
+ synchronized (handlerMap) {
+ if (handlerMap.containsKey(taskAttemptId)) {
+ handlerMap.remove(taskAttemptId);
+ }
+ }
+ }
+
+ @Override
+ public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+ throws IOException {
+
+ final Map<String, List<String>> params =
+ new QueryStringDecoder(request.getUri()).getParameters();
+
+ if (!params.containsKey("qid")) {
+ throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
+ }
+
+ if (params.containsKey("sid")) {
+ List<FileChunk> chunks = Lists.newArrayList();
+ List<String> queryUnidIds = splitMaps(params.get("qid"));
+ for (String eachQueryUnitId : queryUnidIds) {
+ String[] queryUnitIdSeqTokens = eachQueryUnitId.split("_");
+ ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
+ QueryUnitId quid = new QueryUnitId(ebId, Integer.parseInt(queryUnitIdSeqTokens[0]));
+
+ QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid, Integer.parseInt(queryUnitIdSeqTokens[1]));
+
+ RetrieverHandler handler = handlerMap.get(attemptId.toString());
+ FileChunk chunk = handler.get(params);
+ chunks.add(chunk);
+ }
+ return chunks.toArray(new FileChunk[chunks.size()]);
+ } else {
+ RetrieverHandler handler = handlerMap.get(params.get("qid").get(0));
+ FileChunk chunk = handler.get(params);
+ if (chunk == null) {
+ if (params.containsKey("qid")) { // if there is no content corresponding to the query
+ return null;
+ } else { // if there is no
+ throw new FileNotFoundException("No such a file corresponding to " + params.get("qid"));
+ }
+ }
+
+ File file = chunk.getFile();
+ if (file.isHidden() || !file.exists()) {
+ throw new FileNotFoundException("No such file: " + file.getAbsolutePath());
+ }
+ if (!file.isFile()) {
+ throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file");
+ }
+
+ return new FileChunk[] {chunk};
+ }
+ }
+
+ private List<String> splitMaps(List<String> qids) {
+ if (null == qids) {
+ LOG.error("QueryUnitId is EMPTY");
+ return null;
+ }
+
+ final List<String> ret = new ArrayList<String>();
+ for (String qid : qids) {
+ Collections.addAll(ret, qid.split(","));
+ }
+ return ret;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
new file mode 100644
index 0000000..8f55f7b
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+
+import java.io.IOException;
+
+public interface DataRetriever {
+ FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
new file mode 100644
index 0000000..dc63929
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.apache.tajo.pullserver.FileAccessForbiddenException;
+import org.apache.tajo.pullserver.HttpDataServerHandler;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public class DirectoryRetriever implements DataRetriever {
+ public String baseDir;
+
+ public DirectoryRetriever(String baseDir) {
+ this.baseDir = baseDir;
+ }
+
+ @Override
+ public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+ throws IOException {
+ final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
+ if (path == null) {
+ throw new IllegalArgumentException("Wrong path: " +path);
+ }
+
+ File file = new File(baseDir, path);
+ if (file.isHidden() || !file.exists()) {
+ throw new FileNotFoundException("No such file: " + baseDir + "/" + path);
+ }
+ if (!file.isFile()) {
+ throw new FileAccessForbiddenException("No such file: "
+ + baseDir + "/" + path);
+ }
+
+ return new FileChunk[] {new FileChunk(file, 0, file.length())};
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
new file mode 100644
index 0000000..67cff21
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+public class FileChunk {
+ private final File file;
+ private final long startOffset;
+ private long length;
+
+ /**
+ * TRUE if this.file is created by getting data from a remote host (e.g., by HttpRequest). FALSE otherwise.
+ */
+ private boolean fromRemote;
+
+ /**
+ * ExecutionBlockId
+ */
+ private String ebId;
+
+ public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
+ this.file = file;
+ this.startOffset = startOffset;
+ this.length = length;
+ }
+
+ public File getFile() {
+ return this.file;
+ }
+
+ public long startOffset() {
+ return this.startOffset;
+ }
+
+ public long length() {
+ return this.length;
+ }
+
+ public void setLength(long newLength) {
+ this.length = newLength;
+ }
+
+ public boolean fromRemote() {
+ return this.fromRemote;
+ }
+
+ public void setFromRemote(boolean newVal) {
+ this.fromRemote = newVal;
+ }
+
+ public String getEbId() {
+ return this.ebId;
+ }
+
+ public void setEbId(String newVal) {
+ this.ebId = newVal;
+ }
+
+ public String toString() {
+ return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") "
+ + file.getAbsolutePath();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
new file mode 100644
index 0000000..5567c0d
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/RetrieverHandler.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public interface RetrieverHandler {
+ /**
+ *
+ * @param kvs url-decoded key/value pairs
+ * @return a desired part of a file
+ * @throws java.io.IOException
+ */
+ public FileChunk get(Map<String, List<String>> kvs) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/pom.xml b/tajo-yarn-pullserver/pom.xml
deleted file mode 100644
index a7644a1..0000000
--- a/tajo-yarn-pullserver/pom.xml
+++ /dev/null
@@ -1,146 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>tajo-project</artifactId>
- <groupId>org.apache.tajo</groupId>
- <version>0.9.1-SNAPSHOT</version>
- <relativePath>../tajo-project</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <name>Tajo Core PullServer</name>
- <artifactId>tajo-yarn-pullserver</artifactId>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <executions>
- <execution>
- <phase>verify</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.tajo</groupId>
- <artifactId>tajo-rpc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tajo</groupId>
- <artifactId>tajo-catalog-common</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.tajo</groupId>
- <artifactId>tajo-storage</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-nodemanager</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-
- <profiles>
- <profile>
- <id>docs</id>
- <activation>
- <activeByDefault>false</activeByDefault>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <executions>
- <execution>
- <!-- build javadoc jars per jar for publishing to maven -->
- <id>module-javadocs</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <destDir>${project.build.directory}</destDir>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-
- <reporting>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-project-info-reports-plugin</artifactId>
- <version>2.4</version>
- <configuration>
- <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
- </configuration>
- </plugin>
- </plugins>
- </reporting>
-
-</project>
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
deleted file mode 100644
index b0b8d18..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.handler.stream.ChunkedFile;
-
-import java.io.FileDescriptor;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-public class FadvisedChunkedFile extends ChunkedFile {
-
- private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
-
- private final boolean manageOsCache;
- private final int readaheadLength;
- private final ReadaheadPool readaheadPool;
- private final FileDescriptor fd;
- private final String identifier;
-
- private ReadaheadPool.ReadaheadRequest readaheadRequest;
-
- public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
- int chunkSize, boolean manageOsCache, int readaheadLength,
- ReadaheadPool readaheadPool, String identifier) throws IOException {
- super(file, position, count, chunkSize);
- this.manageOsCache = manageOsCache;
- this.readaheadLength = readaheadLength;
- this.readaheadPool = readaheadPool;
- this.fd = file.getFD();
- this.identifier = identifier;
- }
-
- @Override
- public Object nextChunk() throws Exception {
- if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
- readaheadRequest = readaheadPool
- .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
- getEndOffset(), readaheadRequest);
- }
- return super.nextChunk();
- }
-
- @Override
- public void close() throws Exception {
- if (readaheadRequest != null) {
- readaheadRequest.cancel();
- }
- if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
- try {
- PullServerUtil.posixFadviseIfPossible(identifier,
- fd,
- getStartOffset(), getEndOffset() - getStartOffset(),
- NativeIO.POSIX.POSIX_FADV_DONTNEED);
- } catch (Throwable t) {
- LOG.warn("Failed to manage OS cache for " + identifier, t);
- }
- }
- super.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
deleted file mode 100644
index 18cf4b6..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.channel.DefaultFileRegion;
-
-import java.io.FileDescriptor;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.WritableByteChannel;
-
-public class FadvisedFileRegion extends DefaultFileRegion {
-
- private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
-
- private final boolean manageOsCache;
- private final int readaheadLength;
- private final ReadaheadPool readaheadPool;
- private final FileDescriptor fd;
- private final String identifier;
- private final long count;
- private final long position;
- private final int shuffleBufferSize;
- private final boolean shuffleTransferToAllowed;
- private final FileChannel fileChannel;
-
- private ReadaheadPool.ReadaheadRequest readaheadRequest;
- public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
-
- public FadvisedFileRegion(RandomAccessFile file, long position, long count,
- boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
- String identifier) throws IOException {
- this(file, position, count, manageOsCache, readaheadLength, readaheadPool,
- identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true);
- }
-
- public FadvisedFileRegion(RandomAccessFile file, long position, long count,
- boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
- String identifier, int shuffleBufferSize,
- boolean shuffleTransferToAllowed) throws IOException {
- super(file.getChannel(), position, count);
- this.manageOsCache = manageOsCache;
- this.readaheadLength = readaheadLength;
- this.readaheadPool = readaheadPool;
- this.fd = file.getFD();
- this.identifier = identifier;
- this.fileChannel = file.getChannel();
- this.count = count;
- this.position = position;
- this.shuffleBufferSize = shuffleBufferSize;
- this.shuffleTransferToAllowed = shuffleTransferToAllowed;
- }
-
- @Override
- public long transferTo(WritableByteChannel target, long position)
- throws IOException {
- if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
- readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
- getPosition() + position, readaheadLength,
- getPosition() + getCount(), readaheadRequest);
- }
-
- if(this.shuffleTransferToAllowed) {
- return super.transferTo(target, position);
- } else {
- return customShuffleTransfer(target, position);
- }
- }
-
- /**
- * This method transfers data using local buffer. It transfers data from
- * a disk to a local buffer in memory, and then it transfers data from the
- * buffer to the target. This is used only if transferTo is disallowed in
- * the configuration file. super.TransferTo does not perform well on Windows
- * due to a small IO request generated. customShuffleTransfer can control
- * the size of the IO requests by changing the size of the intermediate
- * buffer.
- */
- @VisibleForTesting
- long customShuffleTransfer(WritableByteChannel target, long position)
- throws IOException {
- long actualCount = this.count - position;
- if (actualCount < 0 || position < 0) {
- throw new IllegalArgumentException(
- "position out of range: " + position +
- " (expected: 0 - " + (this.count - 1) + ')');
- }
- if (actualCount == 0) {
- return 0L;
- }
-
- long trans = actualCount;
- int readSize;
- ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
-
- while(trans > 0L &&
- (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
- //adjust counters and buffer limit
- if(readSize < trans) {
- trans -= readSize;
- position += readSize;
- byteBuffer.flip();
- } else {
- //We can read more than we need if the actualCount is not multiple
- //of the byteBuffer size and file is big enough. In that case we cannot
- //use flip method but we need to set buffer limit manually to trans.
- byteBuffer.limit((int)trans);
- byteBuffer.position(0);
- position += trans;
- trans = 0;
- }
-
- //write data to the target
- while(byteBuffer.hasRemaining()) {
- target.write(byteBuffer);
- }
-
- byteBuffer.clear();
- }
-
- return actualCount - trans;
- }
-
-
- @Override
- public void releaseExternalResources() {
- if (readaheadRequest != null) {
- readaheadRequest.cancel();
- }
- super.releaseExternalResources();
- }
-
- /**
- * Call when the transfer completes successfully so we can advise the OS that
- * we don't need the region to be cached anymore.
- */
- public void transferSuccessful() {
- if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) {
- try {
- PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(),
- NativeIO.POSIX.POSIX_FADV_DONTNEED);
- } catch (Throwable t) {
- LOG.warn("Failed to manage OS cache for " + identifier, t);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
deleted file mode 100644
index c703f6f..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileAccessForbiddenException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import java.io.IOException;
-
-public class FileAccessForbiddenException extends IOException {
- private static final long serialVersionUID = -3383272565826389213L;
-
- public FileAccessForbiddenException() {
- }
-
- public FileAccessForbiddenException(String message) {
- super(message);
- }
-
- public FileAccessForbiddenException(Throwable cause) {
- super(cause);
- }
-
- public FileAccessForbiddenException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
deleted file mode 100644
index 236db89..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-public class FileCloseListener implements ChannelFutureListener {
-
- private FadvisedFileRegion filePart;
- private String requestUri;
- private TajoPullServerService pullServerService;
- private long startTime;
-
- public FileCloseListener(FadvisedFileRegion filePart,
- String requestUri,
- long startTime,
- TajoPullServerService pullServerService) {
- this.filePart = filePart;
- this.requestUri = requestUri;
- this.pullServerService = pullServerService;
- this.startTime = startTime;
- }
-
- // TODO error handling; distinguish IO/connection failures,
- // attribute to appropriate spill output
- @Override
- public void operationComplete(ChannelFuture future) {
- if(future.isSuccess()){
- filePart.transferSuccessful();
- }
- filePart.releaseExternalResources();
- if (pullServerService != null) {
- pullServerService.completeFileChunk(filePart, requestUri, startTime);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
deleted file mode 100644
index 31db15c..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.pullserver.retriever.DataRetriever;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
-
-import java.io.*;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
- private final static Log LOG = LogFactory.getLog(HttpDataServerHandler.class);
-
- Map<ExecutionBlockId, DataRetriever> retrievers =
- new ConcurrentHashMap<ExecutionBlockId, DataRetriever>();
- private String userName;
- private String appId;
-
- public HttpDataServerHandler(String userName, String appId) {
- this.userName= userName;
- this.appId = appId;
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
- }
-
- String base =
- ContainerLocalizer.USERCACHE + "/" + userName + "/"
- + ContainerLocalizer.APPCACHE + "/"
- + appId + "/output" + "/";
-
- final Map<String, List<String>> params =
- new QueryStringDecoder(request.getUri()).getParameters();
-
- List<FileChunk> chunks = Lists.newArrayList();
- List<String> taskIds = splitMaps(params.get("ta"));
- int sid = Integer.valueOf(params.get("sid").get(0));
- int partitionId = Integer.valueOf(params.get("p").get(0));
- for (String ta : taskIds) {
-
- File file = new File(base + "/" + sid + "/" + ta + "/output/" + partitionId);
- FileChunk chunk = new FileChunk(file, 0, file.length());
- chunks.add(chunk);
- }
-
- FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
-// try {
-// file = retriever.handle(ctx, request);
-// } catch (FileNotFoundException fnf) {
-// LOG.error(fnf);
-// sendError(ctx, NOT_FOUND);
-// return;
-// } catch (IllegalArgumentException iae) {
-// LOG.error(iae);
-// sendError(ctx, BAD_REQUEST);
-// return;
-// } catch (FileAccessForbiddenException fafe) {
-// LOG.error(fafe);
-// sendError(ctx, FORBIDDEN);
-// return;
-// } catch (IOException ioe) {
-// LOG.error(ioe);
-// sendError(ctx, INTERNAL_SERVER_ERROR);
-// return;
-// }
-
- // Write the content.
- Channel ch = e.getChannel();
- if (file == null) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
- ch.write(response);
- if (!isKeepAlive(request)) {
- ch.close();
- }
- } else {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- long totalSize = 0;
- for (FileChunk chunk : file) {
- totalSize += chunk.length();
- }
- setContentLength(response, totalSize);
-
- // Write the initial line and the header.
- ch.write(response);
-
- ChannelFuture writeFuture = null;
-
- for (FileChunk chunk : file) {
- writeFuture = sendFile(ctx, ch, chunk);
- if (writeFuture == null) {
- sendError(ctx, NOT_FOUND);
- return;
- }
- }
-
- // Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
- // Close the connection when the whole content is written out.
- writeFuture.addListener(ChannelFutureListener.CLOSE);
- }
- }
- }
-
- private ChannelFuture sendFile(ChannelHandlerContext ctx,
- Channel ch,
- FileChunk file) throws IOException {
- RandomAccessFile raf;
- try {
- raf = new RandomAccessFile(file.getFile(), "r");
- } catch (FileNotFoundException fnfe) {
- return null;
- }
-
- ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) != null) {
- // Cannot use zero-copy with HTTPS.
- writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(),
- file.length(), 8192));
- } else {
- // No encryption - use zero-copy.
- final FileRegion region = new DefaultFileRegion(raf.getChannel(),
- file.startOffset(), file.length());
- writeFuture = ch.write(region);
- writeFuture.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) {
- region.releaseExternalResources();
- }
- });
- }
-
- return writeFuture;
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
- if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
- return;
- }
-
- cause.printStackTrace();
- if (ch.isConnected()) {
- sendError(ctx, INTERNAL_SERVER_ERROR);
- }
- }
-
- public static String sanitizeUri(String uri) {
- // Decode the path.
- try {
- uri = URLDecoder.decode(uri, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- try {
- uri = URLDecoder.decode(uri, "ISO-8859-1");
- } catch (UnsupportedEncodingException e1) {
- throw new Error();
- }
- }
-
- // Convert file separators.
- uri = uri.replace('/', File.separatorChar);
-
- // Simplistic dumb security check.
- // You will have to do something serious in the production environment.
- if (uri.contains(File.separator + ".")
- || uri.contains("." + File.separator) || uri.startsWith(".")
- || uri.endsWith(".")) {
- return null;
- }
-
- // Convert to absolute path.
- return uri;
- }
-
- private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(ChannelBuffers.copiedBuffer(
- "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
-
- // Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
- }
-
- private List<String> splitMaps(List<String> qids) {
- if (null == qids) {
- LOG.error("QueryUnitId is EMPTY");
- return null;
- }
-
- final List<String> ret = new ArrayList<String>();
- for (String qid : qids) {
- Collections.addAll(ret, qid.split(","));
- }
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
deleted file mode 100644
index 4c8bd8b..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpContentCompressor;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
- private String userName;
- private String appId;
- public HttpDataServerPipelineFactory(String userName, String appId) {
- this.userName = userName;
- this.appId = appId;
- }
-
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = pipeline();
-
- // Uncomment the following line if you want HTTPS
- // SSLEngine engine =
- // SecureChatSslContextFactory.getServerContext().createSSLEngine();
- // engine.setUseClientMode(false);
- // pipeline.addLast("ssl", new SslHandler(engine));
-
- pipeline.addLast("decoder", new HttpRequestDecoder());
- //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
- pipeline.addLast("deflater", new HttpContentCompressor());
- pipeline.addLast("handler", new HttpDataServerHandler(userName, appId));
- return pipeline;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b5aa7804/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
deleted file mode 100644
index 2cbb101..0000000
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/HttpUtil.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import com.google.common.collect.Maps;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.util.Map;
-
-public class HttpUtil {
- public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException {
- return getParamsFromQuery(uri.getQuery());
- }
-
- /**
- * It parses a query string into key/value pairs
- *
- * @param queryString decoded query string
- * @return key/value pairs parsed from a given query string
- * @throws java.io.UnsupportedEncodingException
- */
- public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException {
- String [] queries = queryString.split("&");
-
- Map<String,String> params = Maps.newHashMap();
- String [] param;
- for (String q : queries) {
- param = q.split("=");
- params.put(param[0], param[1]);
- }
-
- return params;
- }
-
- public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException {
- StringBuilder sb = new StringBuilder();
-
- boolean first = true;
- for (Map.Entry<String,String> param : params.entrySet()) {
- if (!first) {
- sb.append("&");
- }
- sb.append(URLEncoder.encode(param.getKey(), "UTF-8")).
- append("=").
- append(URLEncoder.encode(param.getValue(), "UTF-8"));
- first = false;
- }
-
- return sb.toString();
- }
-}