You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/04/27 06:27:08 UTC
[1/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service
of Yarn.
Repository: tajo
Updated Branches:
refs/heads/master 71193b218 -> 73ac4b87d
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedFileRegion.java b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedFileRegion.java
new file mode 100644
index 0000000..17a753f
--- /dev/null
+++ b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedFileRegion.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.jboss.netty.channel.DefaultFileRegion;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+ private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final ReadaheadPool readaheadPool;
+ private final FileDescriptor fd;
+ private final String identifier;
+ private final long count;
+ private final long position;
+ private final int shuffleBufferSize;
+ private final boolean shuffleTransferToAllowed;
+ private FileChannel fileChannel;
+
+ private ReadaheadPool.ReadaheadRequest readaheadRequest;
+ public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 128 * 1024;
+
+ public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+ boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+ String identifier) throws IOException {
+ this(file, position, count, manageOsCache, readaheadLength, readaheadPool,
+ identifier, DEFAULT_SHUFFLE_BUFFER_SIZE, true);
+ }
+
+ public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+ boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+ String identifier, int shuffleBufferSize,
+ boolean shuffleTransferToAllowed) throws IOException {
+ super(file.getChannel(), position, count);
+ this.manageOsCache = manageOsCache;
+ this.readaheadLength = readaheadLength;
+ this.readaheadPool = readaheadPool;
+ this.fd = file.getFD();
+ this.identifier = identifier;
+ this.fileChannel = file.getChannel();
+ this.count = count;
+ this.position = position;
+ this.shuffleBufferSize = shuffleBufferSize;
+ this.shuffleTransferToAllowed = shuffleTransferToAllowed;
+ }
+
+ @Override
+ public long transferTo(WritableByteChannel target, long position)
+ throws IOException {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+ readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+ getPosition() + position, readaheadLength,
+ getPosition() + getCount(), readaheadRequest);
+ }
+
+ if(this.shuffleTransferToAllowed) {
+ return super.transferTo(target, position);
+ } else {
+ return customShuffleTransfer(target, position);
+ }
+ }
+
+ /**
+ * This method transfers data using local buffer. It transfers data from
+ * a disk to a local buffer in memory, and then it transfers data from the
+ * buffer to the target. This is used only if transferTo is disallowed in
+ * the configuration file. super.TransferTo does not perform well on Windows
+ * due to a small IO request generated. customShuffleTransfer can control
+ * the size of the IO requests by changing the size of the intermediate
+ * buffer.
+ */
+ @VisibleForTesting
+ long customShuffleTransfer(WritableByteChannel target, long position)
+ throws IOException {
+ long actualCount = this.count - position;
+ if (actualCount < 0 || position < 0) {
+ throw new IllegalArgumentException(
+ "position out of range: " + position +
+ " (expected: 0 - " + (this.count - 1) + ')');
+ }
+ if (actualCount == 0) {
+ return 0L;
+ }
+
+ long trans = actualCount;
+ int readSize;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(this.shuffleBufferSize);
+
+ while(trans > 0L &&
+ (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) {
+ //adjust counters and buffer limit
+ if(readSize < trans) {
+ trans -= readSize;
+ position += readSize;
+ byteBuffer.flip();
+ } else {
+ //We can read more than we need if the actualCount is not multiple
+ //of the byteBuffer size and file is big enough. In that case we cannot
+ //use flip method but we need to set buffer limit manually to trans.
+ byteBuffer.limit((int)trans);
+ byteBuffer.position(0);
+ position += trans;
+ trans = 0;
+ }
+
+ //write data to the target
+ while(byteBuffer.hasRemaining()) {
+ target.write(byteBuffer);
+ }
+
+ byteBuffer.clear();
+ }
+
+ return actualCount - trans;
+ }
+
+
+ @Override
+ public void releaseExternalResources() {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ readaheadRequest = null;
+ }
+ fileChannel = null;
+ super.releaseExternalResources();
+ }
+
+ /**
+ * Call when the transfer completes successfully so we can advise the OS that
+ * we don't need the region to be cached anymore.
+ */
+ public void transferSuccessful() {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0 && fileChannel != null) {
+ try {
+ PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(),
+ NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ } catch (Throwable t) {
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/src/main/java/org/apache/tajo/yarn/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-yarn/src/main/java/org/apache/tajo/yarn/FileCloseListener.java b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FileCloseListener.java
new file mode 100644
index 0000000..a85d0b9
--- /dev/null
+++ b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FileCloseListener.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.yarn;
+
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+public class FileCloseListener implements ChannelFutureListener {
+
+ private FadvisedFileRegion filePart;
+
+ public FileCloseListener(FadvisedFileRegion filePart) {
+ this.filePart = filePart;
+ }
+
+ // TODO error handling; distinguish IO/connection failures,
+ // attribute to appropriate spill output
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if(future.isSuccess()){
+ filePart.transferSuccessful();
+ }
+ filePart.releaseExternalResources();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/src/main/java/org/apache/tajo/yarn/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn/src/main/java/org/apache/tajo/yarn/TajoPullServerService.java b/tajo-yarn/src/main/java/org/apache/tajo/yarn/TajoPullServerService.java
new file mode 100644
index 0000000..369e17f
--- /dev/null
+++ b/tajo-yarn/src/main/java/org/apache/tajo/yarn/TajoPullServerService.java
@@ -0,0 +1,608 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.InvalidURLException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.IndexCacheKey;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.TajoIdUtils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.*;
+import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
+import org.jboss.netty.handler.codec.http.HttpHeaders.Values;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class TajoPullServerService extends AuxiliaryService {
+
+ private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
+
+ private TajoConf tajoConf;
+
+ private int port;
+ private ChannelFactory selector;
+ private final ChannelGroup accepted = new DefaultChannelGroup("Pull server group");
+ private HttpChannelInitializer channelInitializer;
+ private int sslFileBufferSize;
+ private int maxUrlLength;
+
+ private ApplicationId appId;
+ private FileSystem localFS;
+
+ /**
+ * Should the shuffle use posix_fadvise calls to manage the OS cache during
+ * sendfile
+ */
+ private boolean manageOsCache;
+ private int readaheadLength;
+ private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+
+ private static final Map<String,String> userRsrc =
+ new ConcurrentHashMap<>();
+ private String userName;
+
+ private LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache = null;
+ private int lowCacheHitCheckThreshold;
+
+ @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
+ static class ShuffleMetrics implements ChannelFutureListener {
+ @Metric({"OutputBytes","PullServer output in bytes"})
+ MutableCounterLong shuffleOutputBytes;
+ @Metric({"Failed","# of failed shuffle outputs"})
+ MutableCounterInt shuffleOutputsFailed;
+ @Metric({"Succeeded","# of succeeded shuffle outputs"})
+ MutableCounterInt shuffleOutputsOK;
+ @Metric({"Connections","# of current shuffle connections"})
+ MutableGaugeInt shuffleConnections;
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ shuffleOutputsOK.incr();
+ } else {
+ shuffleOutputsFailed.incr();
+ }
+ shuffleConnections.decr();
+ }
+ }
+
+ final ShuffleMetrics metrics;
+
+ TajoPullServerService(MetricsSystem ms) {
+ super(PullServerConstants.PULLSERVER_SERVICE_NAME);
+ metrics = ms.register(new ShuffleMetrics());
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public TajoPullServerService() {
+ this(DefaultMetricsSystem.instance());
+ }
+
+ @Override
+ public void initializeApplication(ApplicationInitializationContext context) {
+ // TODO these bytes should be versioned
+ // TODO: Once SHuffle is out of NM, this can use MR APIs
+ String user = context.getUser();
+ ApplicationId appId = context.getApplicationId();
+ // ByteBuffer secret = context.getApplicationDataForService();
+ userRsrc.put(appId.toString(), user);
+ }
+
+ @Override
+ public void stopApplication(ApplicationTerminationContext context) {
+ userRsrc.remove(context.getApplicationId().toString());
+ }
+
+ // TODO change AbstractService to throw InterruptedException
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ tajoConf = new TajoConf(conf);
+
+ manageOsCache = tajoConf.getBoolean(PullServerConstants.SHUFFLE_MANAGE_OS_CACHE,
+ PullServerConstants.DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+ readaheadLength = tajoConf.getInt(PullServerConstants.SHUFFLE_READAHEAD_BYTES,
+ PullServerConstants.DEFAULT_SHUFFLE_READAHEAD_BYTES);
+
+ int workerNum = tajoConf.getIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM);
+
+ ThreadFactory bossFactory = new ThreadFactoryBuilder()
+ .setNameFormat("TajoPullServerService Netty Boss #%d")
+ .build();
+ ThreadFactory workerFactory = new ThreadFactoryBuilder()
+ .setNameFormat("TajoPullServerService Netty Worker #%d")
+ .build();
+ selector = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(bossFactory),
+ Executors.newCachedThreadPool(workerFactory),
+ workerNum);
+
+ localFS = new LocalFileSystem();
+
+ maxUrlLength = tajoConf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+
+ LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
+
+ ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ try {
+ channelInitializer = new HttpChannelInitializer(tajoConf);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ bootstrap.setPipelineFactory(channelInitializer);
+
+ port = tajoConf.getIntVar(ConfVars.PULLSERVER_PORT);
+ Channel ch = bootstrap.bind(new InetSocketAddress(port));
+
+ accepted.add(ch);
+ port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+ tajoConf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
+ LOG.info(getName() + " listening on port " + port);
+
+ sslFileBufferSize = tajoConf.getInt(PullServerConstants.SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+ PullServerConstants.DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+
+ int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
+ int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
+
+ indexReaderCache = CacheBuilder.newBuilder()
+ .maximumSize(cacheSize)
+ .expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
+ .removalListener(removalListener)
+ .build(
+ new CacheLoader<IndexCacheKey, BSTIndexReader>() {
+ @Override
+ public BSTIndexReader load(IndexCacheKey key) throws Exception {
+ return new BSTIndex(tajoConf).getIndexReader(new Path(key.getPath(), "index"));
+ }
+ }
+ );
+ lowCacheHitCheckThreshold = (int) (cacheSize * 0.1f);
+
+ super.serviceInit(tajoConf);
+ LOG.info("TajoPullServerService started: port=" + port);
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ // TODO: check this wait
+ accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ if (selector != null) {
+ ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ bootstrap.releaseExternalResources();
+ }
+
+ if (channelInitializer != null) {
+ channelInitializer.destroy();
+ }
+
+ localFS.close();
+ indexReaderCache.invalidateAll();
+
+ super.serviceStop();
+ }
+
+ @VisibleForTesting
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public ByteBuffer getMetaData() {
+ try {
+ return serializeMetaData(port);
+ } catch (IOException e) {
+ LOG.error("Error during getMeta", e);
+ // TODO add API to AuxiliaryServices to report failures
+ return null;
+ }
+ }
+
+ /**
+ * Serialize the shuffle port into a ByteBuffer for use later on.
+ * @param port the port to be sent to the ApplciationMaster
+ * @return the serialized form of the port.
+ */
+ public static ByteBuffer serializeMetaData(int port) throws IOException {
+ //TODO these bytes should be versioned
+ return ByteBuffer.allocate(SizeOf.SIZE_OF_INT).putInt(port);
+ }
+
+ class HttpChannelInitializer implements ChannelPipelineFactory {
+
+ final PullServer PullServer;
+ private SSLFactory sslFactory;
+
+ public HttpChannelInitializer(TajoConf conf) throws Exception {
+ PullServer = new PullServer(conf);
+ if (conf.getBoolVar(ConfVars.SHUFFLE_SSL_ENABLED_KEY)) {
+ sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+ sslFactory.init();
+ }
+ }
+
+ public void destroy() {
+ if (sslFactory != null) {
+ sslFactory.destroy();
+ }
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ if (sslFactory != null) {
+ pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+ }
+ int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
+ ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
+ pipeline.addLast("codec", new HttpServerCodec(maxUrlLength, 8192, maxChunkSize));
+ pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+ pipeline.addLast("chunking", new ChunkedWriteHandler());
+ pipeline.addLast("shuffle", PullServer);
+ return pipeline;
+ // TODO factor security manager into pipeline
+ // TODO factor out encode/decode to permit binary shuffle
+ // TODO factor out decode of index to permit alt. models
+ }
+ }
+
+ @ChannelHandler.Sharable
+ class PullServer extends SimpleChannelUpstreamHandler {
+
+ private final TajoConf conf;
+ private final LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+ private final Gson gson = new Gson();
+
+ public PullServer(TajoConf conf) throws IOException {
+ this.conf = conf;
+
+ // init local temporal dir
+ lDirAlloc.getAllLocalPathsToRead(".", conf);
+ }
+
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) throws Exception {
+ accepted.add(evt.getChannel());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Current number of shuffle connections (%d)", accepted.size()));
+ }
+ super.channelOpen(ctx, evt);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+ throws Exception {
+
+ HttpRequest request = (HttpRequest) evt.getMessage();
+ Channel ch = evt.getChannel();
+
+ if (request.getMethod() == HttpMethod.DELETE) {
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+ ch.write(response).addListener(ChannelFutureListener.CLOSE);
+
+ clearIndexCache(request.getUri());
+ return;
+ } else if (request.getMethod() != HttpMethod.GET) {
+ sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
+ return;
+ }
+
+ // Parsing the URL into key-values
+ try {
+ final PullServerParams params = new PullServerParams(request.getUri());
+ if (PullServerUtil.isChunkRequest(params.requestType())) {
+ handleChunkRequest(ctx, request, params);
+ } else {
+ handleMetaRequest(ctx, request, params);
+ }
+ } catch (Throwable e) {
+ LOG.error("Failed to handle request " + request.getUri());
+ sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ }
+
+ /**
+ * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
+ * It is called whenever an execution block is completed.
+ *
+ * @param uri query URI which indicates the execution block id
+ * @throws IOException
+ * @throws InvalidURLException
+ */
+ public void clearIndexCache(String uri)
+ throws IOException, InvalidURLException {
+ // Simply parse the given uri
+ String[] tokens = uri.split("=");
+ if (tokens.length != 2 || !tokens[0].equals("ebid")) {
+ throw new IllegalArgumentException("invalid params: " + uri);
+ }
+ ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
+ String queryId = ebId.getQueryId().toString();
+ String ebSeqId = Integer.toString(ebId.getId());
+ List<IndexCacheKey> removed = new ArrayList<>();
+ synchronized (indexReaderCache) {
+ for (Entry<IndexCacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
+ IndexCacheKey key = e.getKey();
+ if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+ e.getValue().forceClose();
+ removed.add(e.getKey());
+ }
+ }
+ indexReaderCache.invalidateAll(removed);
+ }
+ removed.clear();
+ synchronized (waitForRemove) {
+ for (Entry<IndexCacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
+ IndexCacheKey key = e.getKey();
+ if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+ e.getValue().forceClose();
+ removed.add(e.getKey());
+ }
+ }
+ for (IndexCacheKey eachKey : removed) {
+ waitForRemove.remove(eachKey);
+ }
+ }
+ }
+
+ private void handleMetaRequest(ChannelHandlerContext ctx, HttpRequest request, final PullServerParams params)
+ throws IOException, ExecutionException {
+ final List<String> jsonMetas;
+ try {
+ jsonMetas = PullServerUtil.getJsonMeta(conf, lDirAlloc, localFS, params, gson, indexReaderCache,
+ lowCacheHitCheckThreshold);
+ } catch (FileNotFoundException e) {
+ sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+ return;
+ } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+ sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+ return;
+ } catch (ExecutionException e) {
+ // There are some problems in index cache
+ throw new TajoInternalError(e.getCause());
+ }
+
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
+ response.setContent(ChannelBuffers.copiedBuffer(gson.toJson(jsonMetas), CharsetUtil.UTF_8));
+ response.setHeader(Names.CONTENT_TYPE, "application/json; charset=UTF-8");
+ HttpHeaders.setContentLength(response, response.getContent().readableBytes());
+ if (HttpHeaders.isKeepAlive(request)) {
+ response.setHeader(Names.CONNECTION, Values.KEEP_ALIVE);
+ }
+ ChannelFuture writeFuture = ctx.getChannel().write(response);
+
+ // Decide whether to close the connection or not.
+ if (!HttpHeaders.isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ writeFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ private void handleChunkRequest(ChannelHandlerContext ctx, HttpRequest request, final PullServerParams params)
+ throws IOException {
+ final List<FileChunk> chunks;
+ try {
+ chunks = PullServerUtil.getFileChunks(conf, lDirAlloc, localFS, params, indexReaderCache,
+ lowCacheHitCheckThreshold);
+ } catch (FileNotFoundException e) {
+ sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+ return;
+ } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+ sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+ return;
+ } catch (ExecutionException e) {
+ // There are some problems in index cache
+ throw new TajoInternalError(e.getCause());
+ }
+
+ // Write the content.
+ final Channel ch = ctx.getChannel();
+ if (chunks.size() == 0) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+
+ if (!HttpHeaders.isKeepAlive(request)) {
+ ch.write(response).addListener(ChannelFutureListener.CLOSE);
+ } else {
+ response.setHeader(Names.CONNECTION, Values.KEEP_ALIVE);
+ ch.write(response);
+ }
+ } else {
+ FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
+ ChannelFuture writeFuture = null;
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
+ long totalSize = 0;
+ StringBuilder sb = new StringBuilder();
+ for (FileChunk chunk : file) {
+ totalSize += chunk.length();
+ sb.append(Long.toString(chunk.length())).append(",");
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ HttpHeaders.addHeader(response, PullServerConstants.CHUNK_LENGTH_HEADER_NAME, sb.toString());
+ HttpHeaders.setContentLength(response, totalSize);
+
+ if (HttpHeaders.isKeepAlive(request)) {
+ response.setHeader(Names.CONNECTION, Values.KEEP_ALIVE);
+ }
+ // Write the initial line and the header.
+ writeFuture = ch.write(response);
+
+ for (FileChunk chunk : file) {
+ writeFuture = sendFile(ctx, chunk);
+ if (writeFuture == null) {
+ sendError(ctx, HttpResponseStatus.NOT_FOUND);
+ return;
+ }
+ }
+
+ // Decide whether to close the connection or not.
+ if (!HttpHeaders.isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ writeFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+ }
+
+ private ChannelFuture sendFile(ChannelHandlerContext ctx,
+ FileChunk file) throws IOException {
+ Channel ch = ctx.getChannel();
+ RandomAccessFile spill = null;
+ ChannelFuture writeFuture;
+ try {
+ spill = new RandomAccessFile(file.getFile(), "r");
+ if (ctx.getPipeline().get(SslHandler.class) == null) {
+ final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
+ file.startOffset(), file.length(), manageOsCache, readaheadLength,
+ readaheadPool, file.getFile().getAbsolutePath());
+ writeFuture = ch.write(filePart);
+ writeFuture.addListener(new FileCloseListener(filePart));
+ } else {
+ // HTTPS cannot be done with zero copy.
+ final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+ file.startOffset(), file.length(), sslFileBufferSize,
+ manageOsCache, readaheadLength, readaheadPool,
+ file.getFile().getAbsolutePath());
+ writeFuture = ch.write(chunk);
+ }
+ } catch (FileNotFoundException e) {
+ LOG.fatal(file.getFile() + " not found");
+ return null;
+ } catch (Throwable e) {
+ LOG.fatal("error while sending a file: ", e);
+ if (spill != null) {
+ //should close a opening file
+ LOG.warn("Close the file " + file.getFile().getAbsolutePath());
+ spill.close();
+ }
+ return null;
+ }
+ metrics.shuffleConnections.incr();
+ metrics.shuffleOutputBytes.incr(file.length()); // optimistic
+ return writeFuture;
+ }
+
+ private void sendError(ChannelHandlerContext ctx,
+ HttpResponseStatus status) {
+ sendError(ctx, "", status);
+ }
+
+ private void sendError(ChannelHandlerContext ctx, String message,
+ HttpResponseStatus status) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ response.setHeader(Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
+ // Put shuffle version into http header
+ ChannelBuffer content = ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8);
+ response.setContent(content);
+ response.setHeader(Names.CONTENT_LENGTH, content.writerIndex());
+
+ // Close the connection as soon as the error message is sent.
+ ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ Channel ch = e.getChannel();
+ Throwable cause = e.getCause();
+ LOG.error(cause.getMessage(), cause);
+ if (ch.isOpen()) {
+ ch.close();
+ }
+ }
+ }
+
+ // Temporal space to wait for the completion of all index lookup operations
+ private final ConcurrentHashMap<IndexCacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
+
+ // RemovalListener is triggered when an item is removed from the index reader cache.
+ // It closes index readers when they are not used anymore.
+ // If they are still being used, they are moved to waitForRemove map to wait for other operations' completion.
+ private final RemovalListener<IndexCacheKey, BSTIndexReader> removalListener = (removal) -> {
+ BSTIndexReader reader = removal.getValue();
+ if (reader.getReferenceNum() == 0) {
+ try {
+ reader.close(); // tear down properly
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ waitForRemove.remove(removal.getKey());
+ } else {
+ waitForRemove.put(removal.getKey(), reader);
+ }
+ };
+}
[3/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service
of Yarn.
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java
new file mode 100644
index 0000000..e346bdf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerConstants.Param;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.FileChunkMeta;
+import org.apache.tajo.rpc.NettyUtils;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.StorageUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * LocalFetcher retrieves locally stored data. Its behavior can be different according to the pull server is running
+ * externally or internally.
+ *
+ * <ul>
+ * <li>When an internal pull server is running, local fetchers can retrieve data directly.</li>
+ * <li>When an external pull server is running,</li>
+ * <ul>
+ * <li>If the shuffle type is hash, local fetchers can still retrieve data directly.</li>
+ * <li>If the shuffle type is range, local fetchers need to get meta information of data via HTTP. Once the meta
+ * information is retrieved, they can read data directly.</li>
+ * </ul>
+ * </ul>
+ */
+public class LocalFetcher extends AbstractFetcher {
+
+ private final static Log LOG = LogFactory.getLog(LocalFetcher.class);
+
+ private final TajoPullServerService pullServerService;
+
+ private final String host;
+ private int port;
+ private final Bootstrap bootstrap;
+ private final int maxUrlLength;
+ private final List<FileChunkMeta> chunkMetas = new ArrayList<>();
+ private final String tableName;
+ private final FileSystem localFileSystem;
+ private final LocalDirAllocator localDirAllocator;
+
+ @VisibleForTesting
+ public LocalFetcher(TajoConf conf, URI uri, String tableName) throws IOException {
+ super(conf, uri);
+ this.maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+ this.tableName = tableName;
+ this.localFileSystem = new LocalFileSystem();
+ this.localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+ this.pullServerService = null;
+
+ String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+ this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+ this.port = uri.getPort();
+ if (port == -1) {
+ if (scheme.equalsIgnoreCase("http")) {
+ this.port = 80;
+ } else if (scheme.equalsIgnoreCase("https")) {
+ this.port = 443;
+ }
+ }
+
+ bootstrap = new Bootstrap()
+ .group(
+ NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
+ conf.getIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+ conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
+ .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+ .option(ChannelOption.TCP_NODELAY, true);
+ }
+
+ public LocalFetcher(TajoConf conf, URI uri, ExecutionBlockContext executionBlockContext, String tableName) {
+ super(conf, uri);
+ this.localFileSystem = executionBlockContext.getLocalFS();
+ this.localDirAllocator = executionBlockContext.getLocalDirAllocator();
+ this.maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+ this.tableName = tableName;
+
+ Optional<TajoPullServerService> optional = executionBlockContext.getSharedResource().getPullServerService();
+ if (optional.isPresent()) {
+ // local pull server service
+ this.pullServerService = optional.get();
+ this.host = null;
+ this.bootstrap = null;
+
+ } else if (PullServerUtil.useExternalPullServerService(conf)) {
+ // external pull server service
+ pullServerService = null;
+
+ String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+ this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+ this.port = uri.getPort();
+ if (port == -1) {
+ if (scheme.equalsIgnoreCase("http")) {
+ this.port = 80;
+ } else if (scheme.equalsIgnoreCase("https")) {
+ this.port = 443;
+ }
+ }
+
+ bootstrap = new Bootstrap()
+ .group(
+ NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
+ conf.getIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+ conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
+ .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+ .option(ChannelOption.TCP_NODELAY, true);
+ } else {
+ endFetch(FetcherState.FETCH_FAILED);
+ throw new TajoInternalError("Pull server service is not initialized");
+ }
+ }
+
+ @Override
+ public List<FileChunk> get() throws IOException {
+ this.startTime = System.currentTimeMillis();
+ return pullServerService != null ? getWithInternalPullServer() : getWithExternalPullServer();
+ }
+
+ private List<FileChunk> getWithInternalPullServer() throws IOException {
+ final List<FileChunk> fileChunks = new ArrayList<>();
+ PullServerParams params = new PullServerParams(uri.toString());
+ try {
+ fileChunks.addAll(pullServerService.getFileChunks(conf, localDirAllocator, params));
+ } catch (ExecutionException e) {
+ endFetch(FetcherState.FETCH_FAILED);
+ throw new TajoInternalError(e);
+ }
+ fileChunks.stream().forEach(c -> c.setEbId(tableName));
+ endFetch(FetcherState.FETCH_DATA_FINISHED);
+ if (fileChunks.size() > 0) {
+ fileLen = fileChunks.get(0).length();
+ fileNum = 1;
+ } else {
+ fileNum = 0;
+ fileLen = 0;
+ }
+ return fileChunks;
+ }
+
+ private List<FileChunk> getWithExternalPullServer() throws IOException {
+ final PullServerParams params = new PullServerParams(uri.toString());
+ final Path queryBaseDir = PullServerUtil.getBaseOutputDir(params.queryId(), params.ebId());
+
+ if (PullServerUtil.isRangeShuffle(params.shuffleType())) {
+ return getChunksForRangeShuffle(params, queryBaseDir);
+ } else if (PullServerUtil.isHashShuffle(params.shuffleType())) {
+ return getChunksForHashShuffle(params, queryBaseDir);
+ } else {
+ endFetch(FetcherState.FETCH_FAILED);
+ throw new IllegalArgumentException("unknown shuffle type: " + params.shuffleType());
+ }
+ }
+
+ private List<FileChunk> getChunksForHashShuffle(final PullServerParams params, final Path queryBaseDir)
+ throws IOException {
+ final List<FileChunk> fileChunks = new ArrayList<>();
+ final String partId = params.partId();
+ final long offset = params.offset();
+ final long length = params.length();
+ final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ final Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+ if (!localDirAllocator.ifExists(partPath.toString(), conf)) {
+ endFetch(FetcherState.FETCH_FAILED);
+ throw new IOException("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+ }
+ final Path path = localFileSystem.makeQualified(localDirAllocator.getLocalPathToRead(partPath.toString(), conf));
+ final File file = new File(path.toUri());
+ final long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+ final long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+ if (startPos >= file.length()) {
+ endFetch(FetcherState.FETCH_FAILED);
+ throw new IOException("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+ }
+ if (readLen > 0) {
+ final FileChunk chunk = new FileChunk(file, startPos, readLen);
+ chunk.setEbId(tableName);
+ chunk.setFromRemote(false);
+ fileChunks.add(chunk);
+ fileLen = chunk.length();
+ fileNum = 1;
+ }
+
+ endFetch(FetcherState.FETCH_DATA_FINISHED);
+ return fileChunks;
+ }
+
+ private List<FileChunk> getChunksForRangeShuffle(final PullServerParams params, final Path queryBaseDir)
+ throws IOException {
+ final List<FileChunk> fileChunks = new ArrayList<>();
+
+ if (state == FetcherState.FETCH_INIT) {
+ final ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer();
+ bootstrap.handler(initializer);
+ }
+
+ this.state = FetcherState.FETCH_META_FETCHING;
+ ChannelFuture future = null;
+ try {
+ future = bootstrap.clone().connect(new InetSocketAddress(host, port))
+ .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+
+ // Wait until the connection attempt succeeds or fails.
+ Channel channel = future.awaitUninterruptibly().channel();
+ if (!future.isSuccess()) {
+ endFetch(FetcherState.FETCH_FAILED);
+ throw new IOException(future.cause());
+ }
+
+ for (URI eachURI : createChunkMetaRequestURIs(host, port, params)) {
+ String query = eachURI.getPath()
+ + (eachURI.getRawQuery() != null ? "?" + eachURI.getRawQuery() : "");
+ HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
+ request.headers().set(HttpHeaders.Names.HOST, host);
+ request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+ request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Status: " + getState() + ", URI:" + eachURI);
+ }
+ // Send the HTTP request.
+ channel.writeAndFlush(request);
+ }
+ // Wait for the server to close the connection. throw exception if failed
+ channel.closeFuture().syncUninterruptibly();
+
+ if (!state.equals(FetcherState.FETCH_META_FINISHED)) {
+ endFetch(FetcherState.FETCH_FAILED);
+ } else {
+ state = FetcherState.FETCH_DATA_FETCHING;
+ fileLen = fileNum = 0;
+ for (FileChunkMeta eachMeta : chunkMetas) {
+ Path outputPath = StorageUtil.concatPath(queryBaseDir, eachMeta.getTaskId(), "output");
+ if (!localDirAllocator.ifExists(outputPath.toString(), conf)) {
+ LOG.warn("Range shuffle - file not exist. " + outputPath);
+ continue;
+ }
+ Path path = localFileSystem.makeQualified(localDirAllocator.getLocalPathToRead(outputPath.toString(), conf));
+ File file = new File(URI.create(path.toUri() + "/output"));
+ FileChunk chunk = new FileChunk(file, eachMeta.getStartOffset(), eachMeta.getLength());
+ chunk.setEbId(tableName);
+ fileChunks.add(chunk);
+ fileLen += chunk.length();
+ fileNum++;
+ }
+ endFetch(FetcherState.FETCH_DATA_FINISHED);
+ }
+
+ return fileChunks;
+ } finally {
+ if(future != null && future.channel().isOpen()){
+ // Close the channel to exit.
+ future.channel().close().awaitUninterruptibly();
+ }
+ }
+ }
+
+ public class HttpClientHandler extends ChannelInboundHandlerAdapter {
+ private int length = -1;
+ private int totalReceivedContentLength = 0;
+ private byte[] buf;
+ private final Gson gson = new Gson();
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+
+ messageReceiveCount++;
+ if (msg instanceof HttpResponse) {
+ try {
+ HttpResponse response = (HttpResponse) msg;
+
+ StringBuilder sb = new StringBuilder();
+ if (LOG.isDebugEnabled()) {
+ sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
+ .append(response.getProtocolVersion()).append(", HEADER: ");
+ }
+ if (!response.headers().names().isEmpty()) {
+ for (String name : response.headers().names()) {
+ for (String value : response.headers().getAll(name)) {
+ if (LOG.isDebugEnabled()) {
+ sb.append(name).append(" = ").append(value);
+ }
+ if (this.length == -1 && name.equals("Content-Length")) {
+ this.length = Integer.parseInt(value);
+ if (buf == null || buf.length < this.length) {
+ buf = new byte[this.length];
+ }
+ }
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sb.toString());
+ }
+
+ if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
+ LOG.warn("There are no data corresponding to the request");
+ length = 0;
+ return;
+ } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
+ LOG.error(response.getStatus().reasonPhrase());
+ state = TajoProtos.FetcherState.FETCH_FAILED;
+ return;
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
+
+ if (msg instanceof HttpContent) {
+ HttpContent httpContent = (HttpContent) msg;
+ ByteBuf content = httpContent.content();
+
+ if (state != FetcherState.FETCH_FAILED) {
+ try {
+ if (content.isReadable()) {
+ int contentLength = content.readableBytes();
+ content.readBytes(buf, totalReceivedContentLength, contentLength);
+ totalReceivedContentLength += contentLength;
+ }
+
+ if (msg instanceof LastHttpContent) {
+ if (totalReceivedContentLength == length) {
+ state = FetcherState.FETCH_META_FINISHED;
+
+ List<String> jsonMetas = gson.fromJson(new String(buf), List.class);
+ for (String eachJson : jsonMetas) {
+ FileChunkMeta meta = gson.fromJson(eachJson, FileChunkMeta.class);
+ chunkMetas.add(meta);
+ }
+ totalReceivedContentLength = 0;
+ length = -1;
+ } else {
+ endFetch(FetcherState.FETCH_FAILED);
+ throw new IOException("Invalid fetch meta length: " + totalReceivedContentLength + ", expected length: "
+ + length);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ } else {
+ // http content contains the reason why the fetch failed.
+ LOG.error(content.toString(Charset.defaultCharset()));
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ if (cause instanceof ReadTimeoutException) {
+ LOG.warn(cause.getMessage(), cause);
+ } else {
+ LOG.error("Fetch failed :", cause);
+ }
+
+ // this fetching will be retry
+ finishTime = System.currentTimeMillis();
+ state = TajoProtos.FetcherState.FETCH_FAILED;
+ ctx.close();
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ if(getState() == FetcherState.FETCH_INIT || getState() == FetcherState.FETCH_META_FETCHING){
+ //channel is closed, but cannot complete fetcher
+ finishTime = System.currentTimeMillis();
+ LOG.error("Channel closed by peer: " + ctx.channel());
+ state = TajoProtos.FetcherState.FETCH_FAILED;
+ }
+
+ super.channelUnregistered(ctx);
+ }
+ }
+
+ public class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
+
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+
+ int maxChunkSize = conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
+ int readTimeout = conf.getIntVar(ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
+
+ pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
+ pipeline.addLast("inflater", new HttpContentDecompressor());
+ pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
+ pipeline.addLast("handler", new HttpClientHandler());
+ }
+ }
+
+ private List<URI> createChunkMetaRequestURIs(String pullServerAddr, int pullServerPort, PullServerParams params) {
+ final PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder(pullServerAddr, pullServerPort, maxUrlLength);
+ builder.setRequestType(PullServerConstants.META_REQUEST_PARAM_STRING)
+ .setQueryId(params.queryId())
+ .setShuffleType(params.shuffleType())
+ .setEbId(params.ebId())
+ .setPartId(params.partId());
+
+ if (params.contains(Param.OFFSET)) {
+ builder.setOffset(params.offset()).setLength(params.length());
+ }
+
+ if (PullServerUtil.isRangeShuffle(params.shuffleType())) {
+ builder.setStartKeyBase64(params.startKey())
+ .setEndKeyBase64(params.endKey())
+ .setLastInclude(params.last());
+ }
+
+ if (params.contains(Param.TASK_ID)) {
+ builder.setTaskAttemptIds(params.taskAttemptIds());
+ }
+ return builder.build(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java
new file mode 100644
index 0000000..9a8a70b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/RemoteFetcher.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.NettyUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * RemoteFetcher fetches data from a given uri via HTTP protocol and stores them into
+ * a specific file. It aims at asynchronous and efficient data transmit.
+ */
+public class RemoteFetcher extends AbstractFetcher {
+
+ private final static Log LOG = LogFactory.getLog(RemoteFetcher.class);
+
+ private final String host;
+ private int port;
+
+ private final Bootstrap bootstrap;
+ private final List<Long> chunkLengths = new ArrayList<>();
+
+ public RemoteFetcher(TajoConf conf, URI uri, FileChunk chunk) {
+ super(conf, uri, chunk);
+
+ String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+ this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+ this.port = uri.getPort();
+ if (port == -1) {
+ if (scheme.equalsIgnoreCase("http")) {
+ this.port = 80;
+ } else if (scheme.equalsIgnoreCase("https")) {
+ this.port = 443;
+ }
+ }
+
+ bootstrap = new Bootstrap()
+ .group(
+ NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
+ conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+ conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
+ .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+ .option(ChannelOption.TCP_NODELAY, true);
+ }
+
+ @Override
+ public List<FileChunk> get() throws IOException {
+ List<FileChunk> fileChunks = new ArrayList<>();
+
+ if (state == FetcherState.FETCH_INIT) {
+ ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
+ bootstrap.handler(initializer);
+ }
+
+ this.startTime = System.currentTimeMillis();
+ this.state = FetcherState.FETCH_DATA_FETCHING;
+ ChannelFuture future = null;
+ try {
+ future = bootstrap.clone().connect(new InetSocketAddress(host, port))
+ .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+
+ // Wait until the connection attempt succeeds or fails.
+ Channel channel = future.awaitUninterruptibly().channel();
+ if (!future.isSuccess()) {
+ state = TajoProtos.FetcherState.FETCH_FAILED;
+ throw new IOException(future.cause());
+ }
+
+ String query = uri.getPath()
+ + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
+ // Prepare the HTTP request.
+ HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
+ request.headers().set(HttpHeaders.Names.HOST, host);
+ request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+ request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Status: " + getState() + ", URI:" + uri);
+ }
+ // Send the HTTP request.
+ channel.writeAndFlush(request);
+
+ // Wait for the server to close the connection. throw exception if failed
+ channel.closeFuture().syncUninterruptibly();
+
+ fileChunk.setLength(fileChunk.getFile().length());
+
+ long start = 0;
+ for (Long eachChunkLength : chunkLengths) {
+ if (eachChunkLength == 0) continue;
+ FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength);
+ chunk.setEbId(fileChunk.getEbId());
+ chunk.setFromRemote(true);
+ fileChunks.add(chunk);
+ start += eachChunkLength;
+ }
+ return fileChunks;
+
+ } finally {
+ if(future != null && future.channel().isOpen()){
+ // Close the channel to exit.
+ future.channel().close().awaitUninterruptibly();
+ }
+
+ this.finishTime = System.currentTimeMillis();
+ long elapsedMills = finishTime - startTime;
+ String transferSpeed;
+ if(elapsedMills > 1000) {
+ long bytePerSec = (fileChunk.length() * 1000) / elapsedMills;
+ transferSpeed = FileUtils.byteCountToDisplaySize(bytePerSec);
+ } else {
+ transferSpeed = FileUtils.byteCountToDisplaySize(Math.max(fileChunk.length(), 0));
+ }
+
+ LOG.info(String.format("Fetcher :%d ms elapsed. %s/sec, len:%d, state:%s, URL:%s",
+ elapsedMills, transferSpeed, fileChunk.length(), getState(), uri));
+ }
+ }
+
+ public class HttpClientHandler extends ChannelInboundHandlerAdapter {
+ private final File file;
+ private RandomAccessFile raf;
+ private FileChannel fc;
+ private long length = -1;
+ private int totalReceivedContentLength = 0;
+
+ public HttpClientHandler(File file) throws FileNotFoundException {
+ this.file = file;
+ this.raf = new RandomAccessFile(file, "rw");
+ this.fc = raf.getChannel();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+
+ messageReceiveCount++;
+ if (msg instanceof HttpResponse) {
+ try {
+ HttpResponse response = (HttpResponse) msg;
+
+ StringBuilder sb = new StringBuilder();
+ if (LOG.isDebugEnabled()) {
+ sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
+ .append(response.getProtocolVersion()).append(", HEADER: ");
+ }
+ if (!response.headers().names().isEmpty()) {
+ for (String name : response.headers().names()) {
+ for (String value : response.headers().getAll(name)) {
+ if (LOG.isDebugEnabled()) {
+ sb.append(name).append(" = ").append(value);
+ }
+ if (this.length == -1 && name.equals("Content-Length")) {
+ this.length = Long.parseLong(value);
+ }
+ }
+ }
+ if (response.headers().contains(PullServerConstants.CHUNK_LENGTH_HEADER_NAME)) {
+ String stringOffset = response.headers().get(PullServerConstants.CHUNK_LENGTH_HEADER_NAME);
+
+ for (String eachSplit : stringOffset.split(",")) {
+ chunkLengths.add(Long.parseLong(eachSplit));
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sb.toString());
+ }
+
+ if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
+ LOG.warn("There are no data corresponding to the request");
+ length = 0;
+ return;
+ } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
+ LOG.error(response.getStatus().reasonPhrase(), response.getDecoderResult().cause());
+ endFetch(FetcherState.FETCH_FAILED);
+ return;
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
+
+ if (msg instanceof HttpContent) {
+ HttpContent httpContent = (HttpContent) msg;
+ ByteBuf content = httpContent.content();
+
+ if (state != FetcherState.FETCH_FAILED) {
+ try {
+ if (content.isReadable()) {
+ totalReceivedContentLength += content.readableBytes();
+ content.readBytes(fc, content.readableBytes());
+ }
+
+ if (msg instanceof LastHttpContent) {
+ if (raf != null) {
+ fileLen = file.length();
+ fileNum = 1;
+ }
+
+ if (totalReceivedContentLength == length) {
+ endFetch(FetcherState.FETCH_DATA_FINISHED);
+ } else {
+ endFetch(FetcherState.FETCH_FAILED);
+ throw new IOException("Invalid fetch length: " + totalReceivedContentLength + ", but expected " + length);
+ }
+ IOUtils.cleanup(LOG, fc, raf);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ } else {
+ // http content contains the reason why the fetch failed.
+ LOG.error(content.toString(Charset.defaultCharset()));
+ }
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ if (cause instanceof ReadTimeoutException) {
+ LOG.warn(cause.getMessage(), cause);
+ } else {
+ LOG.error("Fetch failed :", cause);
+ }
+
+ // this fetching will be retry
+ IOUtils.cleanup(LOG, fc, raf);
+ endFetch(FetcherState.FETCH_FAILED);
+ ctx.close();
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ if(getState() != FetcherState.FETCH_DATA_FINISHED){
+ //channel is closed, but cannot complete fetcher
+ endFetch(FetcherState.FETCH_FAILED);
+ LOG.error("Channel closed by peer: " + ctx.channel());
+ }
+ IOUtils.cleanup(LOG, fc, raf);
+
+ super.channelUnregistered(ctx);
+ }
+ }
+
+ public class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
+ private final File file;
+
+ public HttpClientChannelInitializer(File file) {
+ this.file = file;
+ }
+
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+
+ int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
+ int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
+
+ pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
+ pipeline.addLast("inflater", new HttpContentDecompressor());
+ pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
+ pipeline.addLast("handler", new HttpClientHandler(file));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index de337ae..6296bb0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -37,6 +37,7 @@ import org.apache.tajo.engine.function.FunctionLoader;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.metrics.Node;
import org.apache.tajo.plan.function.python.PythonScriptEngine;
+import org.apache.tajo.pullserver.PullServerUtil;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.querymaster.QueryMaster;
import org.apache.tajo.querymaster.QueryMasterManagerService;
@@ -64,7 +65,6 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.conf.TajoConf.ConfVars;
@@ -144,7 +144,12 @@ public class TajoWorker extends CompositeService {
queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
addIfService(queryMasterManagerService);
- this.taskManager = new TaskManager(dispatcher, workerContext);
+ if (!PullServerUtil.useExternalPullServerService(systemConf)) {
+ pullService = new TajoPullServerService();
+ addIfService(pullService);
+ }
+
+ this.taskManager = new TaskManager(dispatcher, workerContext, pullService);
addService(taskManager);
this.taskExecutor = new TaskExecutor(workerContext);
@@ -158,21 +163,16 @@ public class TajoWorker extends CompositeService {
addService(new NodeStatusUpdater(workerContext));
int httpPort = 0;
- if(!TajoPullServerService.isStandalone()) {
- pullService = new TajoPullServerService();
- addIfService(pullService);
- }
-
if (!systemConf.getBoolVar(ConfVars.$TEST_MODE)) {
httpPort = initWebServer();
}
super.serviceInit(conf);
- int pullServerPort;
+ int pullServerPort = systemConf.getIntVar(ConfVars.PULLSERVER_PORT);
if(pullService != null){
pullServerPort = pullService.getPort();
- } else {
+ } else if (TajoPullServerService.isStandalone()) {
pullServerPort = getStandAlonePullServerPort();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 19d5da4..4a5b0b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -54,5 +54,5 @@ public interface Task {
TaskHistory createTaskHistory();
- List<Fetcher> getFetchers();
+ List<AbstractFetcher> getFetchers();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 55eb02a..920dfe5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
@@ -53,7 +54,6 @@ import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.rpc.NullCallback;
@@ -84,7 +84,7 @@ public class TaskImpl implements Task {
private final Path taskDir;
private final TaskAttemptContext context;
- private List<Fetcher> fetcherRunners;
+ private List<AbstractFetcher> fetcherRunners;
private LogicalNode plan;
private PhysicalExec executor;
@@ -269,7 +269,7 @@ public class TaskImpl implements Task {
return taskIdStr1.compareTo(taskIdStr2);
});
- for (Fetcher f : fetcherRunners) {
+ for (AbstractFetcher f : fetcherRunners) {
fetcherExecutor.submit(new FetchRunner(context, f));
}
}
@@ -516,14 +516,14 @@ public class TaskImpl implements Task {
taskHistory.setTotalFetchCount(fetcherRunners.size());
int i = 0;
FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
- for (Fetcher fetcher : fetcherRunners) {
+ for (AbstractFetcher fetcher : fetcherRunners) {
builder.setStartTime(fetcher.getStartTime());
builder.setFinishTime(fetcher.getFinishTime());
builder.setFileLength(fetcher.getFileLen());
builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
builder.setState(fetcher.getState());
taskHistory.addFetcherHistory(builder.build());
- if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
+ if (fetcher.getState() == FetcherState.FETCH_DATA_FINISHED) i++;
}
taskHistory.setFinishedFetchCount(i);
}
@@ -534,7 +534,7 @@ public class TaskImpl implements Task {
return taskHistory;
}
- public List<Fetcher> getFetchers() {
+ public List<AbstractFetcher> getFetchers() {
return fetcherRunners;
}
@@ -585,10 +585,10 @@ public class TaskImpl implements Task {
private class FetchRunner implements Runnable {
private final TaskAttemptContext ctx;
- private final Fetcher fetcher;
+ private final AbstractFetcher fetcher;
private int maxRetryNum;
- public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
+ public FetchRunner(TaskAttemptContext ctx, AbstractFetcher fetcher) {
this.ctx = ctx;
this.fetcher = fetcher;
this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
@@ -612,7 +612,7 @@ public class TaskImpl implements Task {
}
try {
List<FileChunk> fetched = fetcher.get();
- if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) {
+ if (fetcher.getState() == FetcherState.FETCH_DATA_FINISHED) {
for (FileChunk eachFetch : fetched) {
if (eachFetch.getFile() != null) {
if (!eachFetch.fromRemote()) {
@@ -630,7 +630,7 @@ public class TaskImpl implements Task {
retryNum++;
}
} finally {
- if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
+ if(fetcher.getState() == FetcherState.FETCH_DATA_FINISHED){
fetcherFinished(ctx);
} else {
if (retryNum == maxRetryNum) {
@@ -669,8 +669,8 @@ public class TaskImpl implements Task {
}
}
- private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
- List<FetchProto> fetches) throws IOException {
+ private List<AbstractFetcher> getFetchRunners(TaskAttemptContext ctx,
+ List<FetchProto> fetches) throws IOException {
if (fetches.size() > 0) {
Path inputDir = executionBlockContext.getLocalDirAllocator().
@@ -681,7 +681,7 @@ public class TaskImpl implements Task {
File storeDir;
File defaultStoreFile;
List<FileChunk> storeChunkList = new ArrayList<>();
- List<Fetcher> runnerList = Lists.newArrayList();
+ List<AbstractFetcher> runnerList = Lists.newArrayList();
for (FetchProto f : fetches) {
storeDir = new File(inputDir.toString(), f.getName());
@@ -696,43 +696,16 @@ public class TaskImpl implements Task {
WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
-
- List<FileChunk> localChunkCandidates = getLocalStoredFileChunk(uri, systemConf);
-
- for (FileChunk localChunk : localChunkCandidates) {
- // When a range request is out of range, storeChunk will be NULL. This case is normal state.
- // So, we should skip and don't need to create storeChunk.
- if (localChunk == null || localChunk.length() == 0) {
- continue;
- }
-
- if (localChunk.getFile() != null && localChunk.startOffset() > -1) {
- localChunk.setFromRemote(false);
- localStoreChunkCount++;
- } else {
- localChunk = new FileChunk(defaultStoreFile, 0, -1);
- localChunk.setFromRemote(true);
- }
- localChunk.setEbId(f.getName());
- storeChunkList.add(localChunk);
- }
-
+ localStoreChunkCount++;
+ runnerList.add(new LocalFetcher(systemConf, uri, executionBlockContext, f.getName()));
} else {
+ // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+ // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
FileChunk remoteChunk = new FileChunk(defaultStoreFile, 0, -1);
remoteChunk.setFromRemote(true);
remoteChunk.setEbId(f.getName());
- storeChunkList.add(remoteChunk);
- }
-
- // If we decide that intermediate data should be really fetched from a remote host, storeChunk
- // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
- for (FileChunk eachChunk : storeChunkList) {
- Fetcher fetcher = new Fetcher(systemConf, uri, eachChunk);
- runnerList.add(fetcher);
+ runnerList.add(new RemoteFetcher(systemConf, uri, remoteChunk));
i++;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Create a new Fetcher with storeChunk:" + eachChunk.toString());
- }
}
}
}
@@ -745,96 +718,9 @@ public class TaskImpl implements Task {
}
}
- private List<FileChunk> getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
- // Parse the URI
-
- // Parsing the URL into key-values
- final Map<String, List<String>> params = TajoPullServerService.decodeParams(fetchURI.toString());
-
- String partId = params.get("p").get(0);
- String queryId = params.get("qid").get(0);
- String shuffleType = params.get("type").get(0);
- String sid = params.get("sid").get(0);
-
- final List<String> taskIdList = params.get("ta");
- final List<String> offsetList = params.get("offset");
- final List<String> lengthList = params.get("length");
-
- long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
- long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
- + ", taskIds=" + taskIdList);
- }
-
- // The working directory of Tajo worker for each query, including stage
- Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid);
-
- List<FileChunk> chunkList = new ArrayList<>();
- // If the stage requires a range shuffle
- if (shuffleType.equals("r")) {
-
- final String startKey = params.get("start").get(0);
- final String endKey = params.get("end").get(0);
- final boolean last = params.get("final") != null;
- final List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
-
- long before = System.currentTimeMillis();
- for (String eachTaskId : taskIds) {
- Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
- if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) {
- LOG.warn("Range shuffle - file not exist. " + outputPath);
- continue;
- }
- Path path = executionBlockContext.getLocalFS().makeQualified(
- executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
-
- try {
- FileChunk chunk = TajoPullServerService.getFileChunks(queryId, sid, path, startKey, endKey, last);
- chunkList.add(chunk);
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- throw new IOException(t.getCause());
- }
- }
- long after = System.currentTimeMillis();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Index lookup time: " + (after - before) + " ms");
- }
-
- // If the stage requires a hash shuffle or a scattered hash shuffle
- } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
- int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
- Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
-
- if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf)) {
- throw new IOException("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
- }
- Path path = executionBlockContext.getLocalFS().makeQualified(
- executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(), conf));
- File file = new File(path.toUri());
- long startPos = (offset >= 0 && length >= 0) ? offset : 0;
- long readLen = (offset >= 0 && length >= 0) ? length : file.length();
-
- if (startPos >= file.length()) {
- throw new IOException("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
- }
- FileChunk chunk = new FileChunk(file, startPos, readLen);
- chunkList.add(chunk);
-
- } else {
- throw new IOException("Unknown shuffle type");
- }
-
- return chunkList;
- }
-
public static Path getTaskAttemptDir(TaskAttemptId quid) {
- Path workDir =
- StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
- String.valueOf(quid.getTaskId().getId()),
- String.valueOf(quid.getId()));
- return workDir;
+ return StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
+ String.valueOf(quid.getTaskId().getId()),
+ String.valueOf(quid.getId()));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
index 61174b3..efdda9a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
@@ -30,6 +30,7 @@ import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.rpc.AsyncRpcClient;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.RpcClientManager;
@@ -42,6 +43,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import static org.apache.tajo.ResourceProtos.*;
@@ -57,12 +59,19 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
private final Dispatcher dispatcher;
private TaskExecutor executor;
private final Properties rpcParams;
+ private final TajoPullServerService pullServerService;
- public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext){
- this(dispatcher, workerContext, null);
+ public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) {
+ this(dispatcher, workerContext, null, null);
}
- public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, TaskExecutor executor) {
+ public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext,
+ TajoPullServerService pullServerService) {
+ this(dispatcher, workerContext, null, pullServerService);
+ }
+
+ public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, TaskExecutor executor,
+ TajoPullServerService pullServerService) {
super(TaskManager.class.getName());
this.dispatcher = dispatcher;
@@ -70,6 +79,7 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
this.executionBlockContextMap = Maps.newHashMap();
this.executor = executor;
this.rpcParams = RpcParameterFactory.get(this.workerContext.getConf());
+ this.pullServerService = pullServerService;
}
@Override
@@ -124,7 +134,8 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
stub.getExecutionBlockContext(callback.getController(), request.build(), callback);
ExecutionBlockContextResponse contextProto = callback.get();
- ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client);
+ ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client,
+ pullServerService);
context.init();
return context;
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
index 88b7c24..3eab5cd 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
@@ -25,7 +25,7 @@
<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.Fetcher" %>
+<%@ page import="org.apache.tajo.worker.AbstractFetcher" %>
<%@ page import="org.apache.tajo.worker.TajoWorker" %>
<%@ page import="org.apache.tajo.worker.Task" %>
<%@ page import="org.apache.tajo.worker.TaskHistory" %>
@@ -161,7 +161,7 @@
<th>URI</th>
</tr>
<%
- for (Fetcher eachFetcher : task.getFetchers()) {
+ for (AbstractFetcher eachFetcher : task.getFetchers()) {
%>
<tr>
<td><%=index%>
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index 652ab84..7280e1f 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -160,6 +160,9 @@
run mkdir -p share/jdbc-dist
run cp -r $ROOT/tajo-jdbc/target/tajo-jdbc-${project.version}-jar-with-dependencies.jar ./share/jdbc-dist/tajo-jdbc-${project.version}.jar
+ run mkdir -p share/yarn-dist
+ run cp -r $ROOT/tajo-yarn/target/tajo-yarn-${project.version}-jar-with-dependencies.jar ./share/yarn-dist/tajo-yarn-${project.version}.jar
+
run mkdir -p extlib
echo
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-docs/src/main/sphinx/configuration.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration.rst b/tajo-docs/src/main/sphinx/configuration.rst
index 8b0b8a2..bef49cb 100644
--- a/tajo-docs/src/main/sphinx/configuration.rst
+++ b/tajo-docs/src/main/sphinx/configuration.rst
@@ -10,6 +10,7 @@ Configuration
configuration/tajo_master_configuration
configuration/worker_configuration
configuration/catalog_configuration
+ configuration/pullserver_configuration
configuration/ha_configuration
configuration/service_config_defaults
configuration/tajo-site-xml
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst b/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
index e9d17eb..3835512 100644
--- a/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
+++ b/tajo-docs/src/main/sphinx/configuration/cluster_setup.rst
@@ -1,9 +1,9 @@
-*******************************************
+*************
Cluster Setup
-*******************************************
+*************
Fully Distributed Mode
-==========================================
+======================
A fully distributed mode enables a Tajo instance to run on `Hadoop Distributed File System (HDFS) <http://wiki.apache.org/hadoop/HDFS>`_. In this mode, a number of Tajo workers run across a number of the physical nodes where HDFS data nodes run.
@@ -11,7 +11,7 @@ In this section, we explain how to setup the cluster mode.
Settings
---------------------------------------------------------
+--------
Please add the following configs to tajo-site.xml file:
@@ -43,7 +43,7 @@ Please add the following configs to tajo-site.xml file:
</property>
Workers
---------------------------------------------------------
+-------
The file ``conf/workers`` lists all host names of workers, one per line.
By default, this file contains the single entry ``localhost``.
@@ -59,7 +59,7 @@ For example: ::
<ctrl + d>
Make base directories and set permissions
---------------------------------------------------------
+-----------------------------------------
If you want to know Tajo’s configuration in more detail, see Configuration page.
Before launching the tajo, you should create the tajo root dir and set the permission as follows: ::
@@ -69,7 +69,7 @@ Before launching the tajo, you should create the tajo root dir and set the permi
Launch a Tajo cluster
---------------------------------------------------------
+---------------------
Then, execute ``start-tajo.sh`` ::
@@ -77,10 +77,10 @@ Then, execute ``start-tajo.sh`` ::
.. note::
- In default, each worker is set to very little resource capacity. In order to increase parallel degree, please read
+ By default, each worker is set to very little resource capacity. In order to increase parallel degree, please read
:doc:`/configuration/worker_configuration`.
.. note::
- In default, TajoMaster listens on 127.0.0.1 for clients. To allow remote clients to access TajoMaster, please set tajo.master.client-rpc.address config to tajo-site.xml. In order to know how to change the listen port, please refer :doc:`/configuration/service_config_defaults`.
+ By default, TajoMaster listens on 127.0.0.1 for clients. To allow remote clients to access TajoMaster, please set tajo.master.client-rpc.address config to tajo-site.xml. In order to know how to change the listen port, please refer :doc:`/configuration/service_config_defaults`.
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst b/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst
new file mode 100644
index 0000000..68760fc
--- /dev/null
+++ b/tajo-docs/src/main/sphinx/configuration/pullserver_configuration.rst
@@ -0,0 +1,75 @@
+*************************
+Pull Server Configuration
+*************************
+
+Pull servers are responsible for transmitting data among Tajo workers during shuffle phases. Tajo provides several modes
+for pull servers.
+
+Internal Mode (Default)
+=======================
+
+With the internal mode, each worker acts as a pull server. So, they need to transmit data during shuffle phase as well
+as processing them during processing phase.
+
+Standalone Mode
+===============
+
+Sometimes, data shuffling requires huge memory space and a lot of cpu processing.
+This can make query processing slow because Tajo's query engine should contend for limited resources with pull servers.
+Tajo provides the standalone mode to avoid this unnecessary contention.
+
+In this mode, each pull server is executed as a separate process. To enable this mode, you need to add the following
+line to ``${TAJO_HOME}/conf/tajo-env.sh``.
+
+.. code-block:: sh
+
+ export TAJO_PULLSERVER_STANDALONE=true
+
+Then, you can see the following messages when you start up the tajo cluster.
+
+.. code-block:: sh
+
+ Starting single TajoMaster
+ starting master, logging to ...
+ 192.168.10.1: starting pullserver, logging to ...
+ 192.168.10.1: starting worker, logging to ...
+ 192.168.10.2: starting pullserver, logging to ...
+ 192.168.10.2: starting worker, logging to ...
+ ...
+
+.. warning::
+
+ Currently, only one single server should be run in each machine.
+
+Yarn Auxiliary Service Mode
+===========================
+
+You can run pull servers as one of Yarn's auxiliary services. To do so, you need to add the following configurations
+to ``${HADOOP_CONF}/yarn-site.xml``.
+
+.. code-block:: xml
+
+ <property>
+ <name>yarn.nodemanager.aux-services</name>
+ <value>mapreduce_shuffle,tajo_shuffle</value>
+ </property>
+
+ <property>
+ <name>yarn.nodemanager.aux-services.tajo_shuffle.class</name>
+ <value>org.apache.tajo.yarn.TajoPullServerService</value>
+ </property>
+
+ <property>
+ <name>tajo.pullserver.port</name>
+ <value>port number</value>
+ </property>
+
+Optionally, you can add the below configuration to specify temp directories. For this configuration,
+please refer to :doc:`/configuration/worker_configuration`.
+
+.. code-block:: xml
+
+ <property>
+ <name>tajo.worker.tmpdir.locations</name>
+ <value>/path/to/tajo/temporal/directory</value>
+ </property>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index ee1317f..4818983 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -832,6 +832,11 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-yarn</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
<artifactId>tajo-client</artifactId>
<version>${project.version}</version>
</dependency>
@@ -991,13 +996,6 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-tests</artifactId>
- <version>${hadoop.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-shuffle</artifactId>
<version>${hadoop.version}</version>
<exclusions>
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml
index 2a1a745..6c805ac 100644
--- a/tajo-pullserver/pom.xml
+++ b/tajo-pullserver/pom.xml
@@ -56,12 +56,12 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
- <artifactId>tajo-rpc-protobuf</artifactId>
+ <artifactId>tajo-catalog-common</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
- <artifactId>tajo-catalog-common</artifactId>
- <scope>provided</scope>
+ <artifactId>tajo-rpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
index 643d9e0..07f0e56 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
@@ -151,6 +151,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
protected void deallocate() {
if (readaheadRequest != null) {
readaheadRequest.cancel();
+ readaheadRequest = null;
}
super.deallocate();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
index 9c3c523..7f97542 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
@@ -24,18 +24,9 @@ import io.netty.util.concurrent.GenericFutureListener;
public class FileCloseListener implements GenericFutureListener<ChannelFuture> {
private FadvisedFileRegion filePart;
- private String requestUri;
- private TajoPullServerService pullServerService;
- private long startTime;
- public FileCloseListener(FadvisedFileRegion filePart,
- String requestUri,
- long startTime,
- TajoPullServerService pullServerService) {
+ public FileCloseListener(FadvisedFileRegion filePart) {
this.filePart = filePart;
- this.requestUri = requestUri;
- this.pullServerService = pullServerService;
- this.startTime = startTime;
}
// TODO error handling; distinguish IO/connection failures,
@@ -46,8 +37,5 @@ public class FileCloseListener implements GenericFutureListener<ChannelFuture> {
filePart.transferSuccessful();
}
filePart.deallocate();
- if (pullServerService != null) {
- pullServerService.completeFileChunk(filePart, requestUri, startTime);
- }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java
new file mode 100644
index 0000000..74f96e7
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerConstants.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+public class PullServerConstants {
+
+ /**
+ * Pull server query parameters
+ */
+ public enum Param {
+ // Common params
+ REQUEST_TYPE("rtype"), // can be one of 'm' for meta and 'c' for chunk.
+ SHUFFLE_TYPE("stype"), // can be one of 'r', 'h', and 's'.
+ QUERY_ID("qid"),
+ EB_ID("sid"),
+ PART_ID("p"),
+ TASK_ID("ta"),
+ OFFSET("offset"),
+ LENGTH("length"),
+
+ // Range shuffle params
+ START("start"),
+ END("end"),
+ FINAL("final");
+
+ private String key;
+
+ Param(String key) {
+ this.key = key;
+ }
+
+ public String key() {
+ return key;
+ }
+ }
+
+ // Request types ----------------------------------------------------------
+
+ public static final String CHUNK_REQUEST_PARAM_STRING = "c";
+ public static final String META_REQUEST_PARAM_STRING = "m";
+
+ // Shuffle types ----------------------------------------------------------
+
+ public static final String RANGE_SHUFFLE_PARAM_STRING = "r";
+ public static final String HASH_SHUFFLE_PARAM_STRING = "h";
+ public static final String SCATTERED_HASH_SHUFFLE_PARAM_STRING = "s";
+
+ // HTTP header ------------------------------------------------------------
+
+ public static final String CHUNK_LENGTH_HEADER_NAME = "c";
+
+ // SSL configurations -----------------------------------------------------
+
+ public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
+ public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+ "tajo.pullserver.ssl.file.buffer.size";
+
+ // OS cache configurations ------------------------------------------------
+
+ public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
+ public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+ // Prefetch configurations ------------------------------------------------
+
+ public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
+ public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
+
+ // Yarn service ID --------------------------------------------------------
+
+ public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
+
+ // Standalone pull server -------------------------------------------------
+ public static final String PULLSERVER_STANDALONE_ENV_KEY = "TAJO_PULLSERVER_STANDALONE";
+
+ public static final String PULLSERVER_SERVICE_NAME = "httpshuffle";
+}
[4/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service
of Yarn.
Posted by ji...@apache.org.
TAJO-2122: PullServer as an Auxiliary service of Yarn.
Closes #1001
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/73ac4b87
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/73ac4b87
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/73ac4b87
Branch: refs/heads/master
Commit: 73ac4b87d0f7389b79be8a847e4215fc59befaff
Parents: 71193b2
Author: Jihoon Son <ji...@apache.org>
Authored: Wed Apr 27 13:26:28 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed Apr 27 13:26:28 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
pom.xml | 3 +-
.../org/apache/tajo/TajoTestingCluster.java | 2 +-
.../java/org/apache/tajo/conf/TajoConf.java | 1 +
.../org/apache/tajo/exception/ErrorUtil.java | 14 +-
tajo-common/src/main/proto/tajo_protos.proto | 8 +-
tajo-core-tests/pom.xml | 4 +
.../physical/TestProgressExternalSortExec.java | 2 -
.../apache/tajo/master/TestRepartitioner.java | 10 +-
.../apache/tajo/querymaster/TestKillQuery.java | 3 +-
.../apache/tajo/worker/MockExecutionBlock.java | 42 --
.../tajo/worker/MockExecutionBlockContext.java | 42 ++
.../apache/tajo/worker/MockTaskExecutor.java | 2 +-
.../org/apache/tajo/worker/MockTaskManager.java | 3 +-
.../org/apache/tajo/worker/TestFetcher.java | 236 -------
.../worker/TestFetcherWithTajoPullServer.java | 437 ++++++++++++
.../apache/tajo/worker/TestTaskExecutor.java | 2 +-
.../apache/tajo/querymaster/Repartitioner.java | 96 +--
.../org/apache/tajo/worker/AbstractFetcher.java | 89 +++
.../tajo/worker/ExecutionBlockContext.java | 10 +-
.../worker/ExecutionBlockSharedResource.java | 16 +
.../java/org/apache/tajo/worker/Fetcher.java | 356 ----------
.../org/apache/tajo/worker/LocalFetcher.java | 480 +++++++++++++
.../org/apache/tajo/worker/RemoteFetcher.java | 317 +++++++++
.../java/org/apache/tajo/worker/TajoWorker.java | 18 +-
.../main/java/org/apache/tajo/worker/Task.java | 2 +-
.../java/org/apache/tajo/worker/TaskImpl.java | 156 +----
.../org/apache/tajo/worker/TaskManager.java | 19 +-
.../resources/webapps/worker/taskdetail.jsp | 4 +-
tajo-dist/pom.xml | 3 +
tajo-docs/src/main/sphinx/configuration.rst | 1 +
.../main/sphinx/configuration/cluster_setup.rst | 18 +-
.../configuration/pullserver_configuration.rst | 75 ++
tajo-project/pom.xml | 12 +-
tajo-pullserver/pom.xml | 6 +-
.../tajo/pullserver/FadvisedFileRegion.java | 1 +
.../tajo/pullserver/FileCloseListener.java | 14 +-
.../tajo/pullserver/PullServerConstants.java | 93 +++
.../apache/tajo/pullserver/PullServerUtil.java | 688 ++++++++++++++++++-
.../apache/tajo/pullserver/TajoPullServer.java | 5 -
.../tajo/pullserver/TajoPullServerService.java | 683 +++++-------------
.../tajo/pullserver/retriever/FileChunk.java | 4 +-
.../pullserver/retriever/FileChunkMeta.java | 53 ++
.../pullserver/retriever/IndexCacheKey.java | 63 ++
tajo-yarn/pom.xml | 265 +++++++
.../apache/tajo/yarn/FadvisedChunkedFile.java | 82 +++
.../apache/tajo/yarn/FadvisedFileRegion.java | 173 +++++
.../org/apache/tajo/yarn/FileCloseListener.java | 41 ++
.../apache/tajo/yarn/TajoPullServerService.java | 608 ++++++++++++++++
49 files changed, 3794 insertions(+), 1470 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 38aedda..b729cce 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.12.0 - unreleased
NEW FEATURES
+ TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon)
+
TAJO-2109: Implement Radix sort. (jihoon)
TAJO-1955: Add a feature to strip quotes from CSV file. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d255652..71c062b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,12 +92,13 @@
<module>tajo-sql-parser</module>
<module>tajo-storage</module>
<module>tajo-pullserver</module>
- <module>tajo-dist</module>
+ <module>tajo-yarn</module>
<module>tajo-thirdparty/asm</module>
<module>tajo-cli</module>
<module>tajo-metrics</module>
<module>tajo-core-tests</module>
<module>tajo-cluster-tests</module>
+ <module>tajo-dist</module>
</modules>
<build>
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 4e7d236..b1a3306 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -162,7 +162,7 @@ public class TajoTestingCluster {
conf.setInt(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE.varname, 1);
conf.setInt(ConfVars.$EXECUTOR_HASH_SHUFFLE_BUFFER_SIZE.varname, 1);
- /** decrease Hbase thread and memory cache for testing */
+ /* decrease Hbase thread and memory cache for testing */
//server handler
conf.setInt("hbase.regionserver.handler.count", 5);
//client handler
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 2e2fb18..440af80 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -214,6 +214,7 @@ public class TajoConf extends Configuration {
PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")),
PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB,
Validators.min("1")),
+ YARN_SHUFFLE_SERVICE_ENABLED("tajo.shuffle.yarn-service.enabled", false, Validators.bool()),
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, Validators.javaString()),
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
index 9a71bd6..957b3d1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
@@ -34,12 +34,14 @@ public class ErrorUtil {
public static Stacktrace.StackTrace convertStacktrace(Throwable t) {
Stacktrace.StackTrace.Builder builder = Stacktrace.StackTrace.newBuilder();
- for (StackTraceElement element : t.getStackTrace()) {
- builder.addElement(Stacktrace.StackTrace.Element.newBuilder()
- .setFilename(element.getFileName() == null ? "(Unknown Source)" : element.getFileName())
- .setFunction(element.getClassName() + "::" + element.getMethodName())
- .setLine(element.getLineNumber())
- );
+ if (t != null) {
+ for (StackTraceElement element : t.getStackTrace()) {
+ builder.addElement(Stacktrace.StackTrace.Element.newBuilder()
+ .setFilename(element.getFileName() == null ? "(Unknown Source)" : element.getFileName())
+ .setFunction(element.getClassName() + "::" + element.getMethodName())
+ .setLine(element.getLineNumber())
+ );
+ }
}
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index 1795107..78d48c4 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -49,9 +49,11 @@ enum TaskAttemptState {
enum FetcherState {
FETCH_INIT = 0;
- FETCH_FETCHING = 1;
- FETCH_FINISHED = 2;
- FETCH_FAILED = 3;
+ FETCH_META_FETCHING = 1;
+ FETCH_META_FINISHED = 2;
+ FETCH_DATA_FETCHING = 3;
+ FETCH_DATA_FINISHED = 4;
+ FETCH_FAILED = 5;
}
message WorkerConnectionInfoProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index b12642a..3554df4 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -174,6 +174,10 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-yarn</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
<artifactId>tajo-rpc-protobuf</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index eeb179f..51cd5ea 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -173,13 +173,11 @@ public class TestProgressExternalSortExec {
while ((tuple = exec.next()) != null) {
if (cnt == 0) {
initProgress = exec.getProgress();
- System.out.println(initProgress);
assertTrue(initProgress > 0.5f && initProgress < 1.0f);
}
if (cnt == testDataStats.getNumRows() / 2) {
float progress = exec.getProgress();
- System.out.println(progress);
assertTrue(progress > initProgress);
}
curVal = tuple;
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index a13a750..abec6a0 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -20,12 +20,13 @@ package org.apache.tajo.master;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.QueryId;
import org.apache.tajo.ResourceProtos.FetchProto;
import org.apache.tajo.TestTajoIds;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
@@ -88,12 +89,11 @@ public class TestRepartitioner {
assertEquals(1, uris.size()); //In Hash Suffle, Fetcher return only one URI per partition.
URI uri = uris.get(0);
- final Map<String, List<String>> params =
- new QueryStringDecoder(uri).parameters();
+ final PullServerParams params = new PullServerParams(uri);
assertEquals(eachEntry.getKey().toString(), params.get("p").get(0));
- assertEquals("h", params.get("type").get(0));
- assertEquals("" + sid.getId(), params.get("sid").get(0));
+ assertEquals(PullServerConstants.HASH_SHUFFLE_PARAM_STRING, params.shuffleType());
+ assertEquals("" + sid.getId(), params.ebId());
}
Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> mergedHashEntries =
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index ac5efd9..8d33dbc 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -26,7 +26,6 @@ import org.apache.tajo.*;
import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.parser.sql.SQLAnalyzer;
@@ -269,7 +268,7 @@ public class TestKillQuery {
}
};
- ExecutionBlockContext context = new MockExecutionBlock(workerContext, requestProtoBuilder.build()) {
+ ExecutionBlockContext context = new MockExecutionBlockContext(workerContext, requestProtoBuilder.build()) {
@Override
public Path createBaseDir() throws IOException {
return new Path("test");
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
deleted file mode 100644
index cbc4312..0000000
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
-import org.apache.tajo.TaskAttemptId;
-
-import java.io.IOException;
-
-public class MockExecutionBlock extends ExecutionBlockContext {
-
- public MockExecutionBlock(TajoWorker.WorkerContext workerContext,
- ExecutionBlockContextResponse request) throws IOException {
- super(workerContext, request, null);
- }
-
- @Override
- public void init() throws Throwable {
- //skip
- }
-
- @Override
- public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
new file mode 100644
index 0000000..b64ab9b
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlockContext.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
+import org.apache.tajo.TaskAttemptId;
+
+import java.io.IOException;
+
+public class MockExecutionBlockContext extends ExecutionBlockContext {
+
+ public MockExecutionBlockContext(TajoWorker.WorkerContext workerContext,
+ ExecutionBlockContextResponse request) throws IOException {
+ super(workerContext, request, null, null);
+ }
+
+ @Override
+ public void init() throws Throwable {
+ //skip
+ }
+
+ @Override
+ public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
index 071d26a..ea609b1 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
@@ -145,7 +145,7 @@ public class MockTaskExecutor extends TaskExecutor {
}
@Override
- public List<Fetcher> getFetchers() {
+ public List<AbstractFetcher> getFetchers() {
return null;
}
};
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
index 5979bbb..0e114bb 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -29,7 +29,6 @@ import org.apache.tajo.worker.event.TaskManagerEvent;
import java.io.IOException;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
public class MockTaskManager extends TaskManager {
@@ -61,7 +60,7 @@ public class MockTaskManager extends TaskManager {
.setQueryContext(new QueryContext(new TajoConf()).getProto())
.setQueryOutputPath("testpath")
.setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
- return new MockExecutionBlock(getWorkerContext(), builder.build());
+ return new MockExecutionBlockContext(getWorkerContext(), builder.build());
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
deleted file mode 100644
index dfc37b0..0000000
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.hadoop.fs.*;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-public class TestFetcher {
- private String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestFetcher";
- private String INPUT_DIR = TEST_DATA+"/in/";
- private String OUTPUT_DIR = TEST_DATA+"/out/";
- private TajoConf conf = new TajoConf();
- private TajoPullServerService pullServerService;
-
- @Before
- public void setUp() throws Exception {
- CommonTestingUtil.getTestDir(TEST_DATA);
- CommonTestingUtil.getTestDir(INPUT_DIR);
- CommonTestingUtil.getTestDir(OUTPUT_DIR);
- conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR);
- conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
- conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
-
- pullServerService = new TajoPullServerService();
- pullServerService.init(conf);
- pullServerService.start();
- }
-
- @After
- public void tearDown(){
- pullServerService.stop();
- }
-
- @Test
- public void testGet() throws IOException {
- Random rnd = new Random();
- QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
- String sid = "1";
- String partId = "1";
-
- int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
- String dataPath = conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) +
- queryId.toString() + "/output/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
-
- String params = String.format("qid=%s&sid=%s&p=%s&type=%s", queryId, sid, partId, "h");
-
- Path inputPath = new Path(dataPath);
- FSDataOutputStream stream = FileSystem.getLocal(conf).create(inputPath, true);
- for (int i = 0; i < 100; i++) {
- String data = ""+rnd.nextInt();
- stream.write(data.getBytes());
- }
- stream.flush();
- stream.close();
-
- URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
- storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- FileChunk chunk = fetcher.get().get(0);
- assertNotNull(chunk);
- assertNotNull(chunk.getFile());
-
- FileSystem fs = FileSystem.getLocal(new TajoConf());
- FileStatus inStatus = fs.getFileStatus(inputPath);
- FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data"));
-
- assertEquals(inStatus.getLen(), outStatus.getLen());
- assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
- }
-
- @Test
- public void testAdjustFetchProcess() {
- assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0);
- assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0);
- assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0);
- assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0);
- assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0);
- assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0);
- assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0);
- }
-
- @Test
- public void testStatus() throws Exception {
- Random rnd = new Random();
- QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
- String sid = "1";
- String ta = "1_0";
- String partId = "1";
-
- String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
- String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
- FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath), true);
- for (int i = 0; i < 100; i++) {
- String data = ""+rnd.nextInt();
- stream.write(data.getBytes());
- }
- stream.flush();
- stream.close();
-
- URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
- storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
- fetcher.get();
- assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
- }
-
- @Test
- public void testNoContentFetch() throws Exception {
-
- QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
- String sid = "1";
- String ta = "1_0";
- String partId = "1";
-
- String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
- String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
- Path inputPath = new Path(dataPath);
- FileSystem fs = FileSystem.getLocal(conf);
- if(fs.exists(inputPath)){
- fs.delete(new Path(dataPath), true);
- }
-
- FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath).getParent(), true);
- stream.close();
-
- URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
- storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
- fetcher.get();
- assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
- }
-
- @Test
- public void testFailureStatus() throws Exception {
- Random rnd = new Random();
-
- QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
- String sid = "1";
- String ta = "1_0";
- String partId = "1";
-
- String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
-
- //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type
- String shuffleType = "x";
- String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, shuffleType, ta);
-
- FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(dataPath), true);
-
- for (int i = 0; i < 100; i++) {
- String data = params + rnd.nextInt();
- stream.write(data.getBytes());
- }
- stream.flush();
- stream.close();
-
- URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
- storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
- fetcher.get();
- assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
- }
-
- @Test
- public void testServerFailure() throws Exception {
- QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
- String sid = "1";
- String ta = "1_0";
- String partId = "1";
-
- String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
- String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
-
- URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
- storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
-
- pullServerService.stop();
-
- boolean failure = false;
- try{
- fetcher.get();
- } catch (Throwable e){
- failure = true;
- }
- assertTrue(failure);
- assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
new file mode 100644
index 0000000..8844fce
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcherWithTajoPullServer.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.FetchImpl.RangeParam;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestFetcherWithTajoPullServer {
+ private enum FetchType {
+ LOCAL,
+ REMOTE
+ }
+ private enum PullServerType {
+ TAJO,
+ YARN
+ }
+
+ private final String TEST_DATA = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/" +
+ TestFetcherWithTajoPullServer.class.getSimpleName();
+ private final String INPUT_DIR = TEST_DATA+"/in/";
+ private final String OUTPUT_DIR = TEST_DATA+"/out/";
+ private final TajoConf conf = new TajoConf();
+ private Service pullServerService;
+ private final int maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+ private final String TEST_TABLE_NAME = "test";
+ private final FetchType fetchType;
+ private final PullServerType pullServerType;
+ private int pullserverPort;
+
+ public TestFetcherWithTajoPullServer(FetchType fetchType, PullServerType pullServerType) {
+ this.fetchType = fetchType;
+ this.pullServerType = pullServerType;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ CommonTestingUtil.getTestDir(TEST_DATA);
+ CommonTestingUtil.getTestDir(INPUT_DIR);
+ CommonTestingUtil.getTestDir(OUTPUT_DIR);
+ conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR);
+ conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
+ conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
+
+ if (pullServerType.equals(PullServerType.TAJO)) {
+ pullServerService = new TajoPullServerService();
+ } else {
+ pullServerService = new org.apache.tajo.yarn.TajoPullServerService();
+ }
+ pullServerService.init(conf);
+ pullServerService.start();
+
+ if (pullServerType.equals(PullServerType.TAJO)) {
+ pullserverPort = ((TajoPullServerService)pullServerService).getPort();
+ } else {
+ pullserverPort = ((org.apache.tajo.yarn.TajoPullServerService)pullServerService).getPort();
+ }
+ }
+
+ @After
+ public void tearDown() {
+ pullServerService.stop();
+ }
+
+ @Parameters(name = "{index}: {0}, {1}")
+ public static Collection<Object[]> generateParameters() {
+ return Arrays.asList(new Object[][] {
+ {FetchType.LOCAL, PullServerType.TAJO},
+ {FetchType.REMOTE, PullServerType.TAJO},
+ {FetchType.LOCAL, PullServerType.YARN},
+ {FetchType.REMOTE, PullServerType.YARN}
+ });
+ }
+
+ private AbstractFetcher getFetcher(URI uri, File data) throws IOException {
+ if (fetchType.equals(FetchType.LOCAL)) {
+ return new LocalFetcher(conf, uri, TEST_TABLE_NAME);
+ } else {
+ FileChunk storeChunk = new FileChunk(data, 0, data.length());
+ storeChunk.setFromRemote(true);
+ return new RemoteFetcher(conf, uri, storeChunk);
+ }
+ }
+
+ @Test
+ public void testGetHashShuffle() throws IOException {
+ Random rnd = new Random();
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String partId = "1";
+
+ Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+ final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING);
+
+ Path inputPath = new Path(INPUT_DIR, dataPath);
+ FSDataOutputStream stream = FileSystem.getLocal(conf).create(inputPath, true);
+ for (int i = 0; i < 100; i++) {
+ String data = ""+rnd.nextInt();
+ stream.write(data.getBytes());
+ }
+ stream.flush();
+ stream.close();
+
+ URI uri = builder.build(false).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+
+ FileChunk chunk = fetcher.get().get(0);
+ assertNotNull(chunk);
+ assertNotNull(chunk.getFile());
+
+ FileSystem fs = FileSystem.getLocal(new TajoConf());
+ FileStatus inStatus = fs.getFileStatus(inputPath);
+ FileStatus outStatus = fs.getFileStatus(new Path(chunk.getFile().getAbsolutePath()));
+
+ assertEquals(inStatus.getLen(), outStatus.getLen());
+ assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+ }
+
+ @Test
+ public void testGetRangeShuffle() throws IOException {
+ Random rnd = new Random();
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String partId = "1";
+ String taskId = "1";
+ String attemptId = "0";
+
+ Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+ Path outDir = StorageUtil.concatPath(queryBaseDir, taskId + "_" + attemptId, "output");
+ Path dataPath = StorageUtil.concatPath(outDir, "output");
+ Path indexPath = StorageUtil.concatPath(outDir, "index");
+
+ List<String> strings = new ArrayList<>(100);
+ for (int i = 0; i < 100; i++) {
+ strings.add("" + rnd.nextInt());
+ }
+ Collections.sort(strings);
+
+ Path inputPath = new Path(INPUT_DIR, dataPath);
+ FileSystem fs = FileSystem.getLocal(conf);
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
+ final FSDataOutputStream stream = fs.create(inputPath, true);
+ BSTIndex index = new BSTIndex(conf);
+ Schema schema = SchemaBuilder.builder().addAll(new Column[] {new Column("rnd", Type.TEXT)}).build();
+ SortSpec[] sortSpecs = new SortSpec[] {new SortSpec(schema.getColumn(0))};
+ BSTIndexWriter writer = index.getIndexWriter(new Path(INPUT_DIR, indexPath), BSTIndex.TWO_LEVEL_INDEX, schema, new BaseTupleComparator(schema, sortSpecs), true);
+ writer.init();
+
+ for (String t : strings) {
+ writer.write(new VTuple(new Datum[] {DatumFactory.createText(t)}), stream.getPos());
+ stream.write(t.getBytes());
+ }
+ stream.flush();
+ writer.flush();
+ stream.close();
+ writer.close();
+
+ RangeParam rangeParam = new RangeParam(new TupleRange(sortSpecs,
+ new VTuple(new Datum[] {DatumFactory.createText(strings.get(0))}),
+ new VTuple(new Datum[] {DatumFactory.createText(strings.get(strings.size() - 1))})), true, RowStoreUtil.createEncoder(schema));
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING)
+ .setTaskIds(Lists.newArrayList(Integer.parseInt(taskId)))
+ .setAttemptIds(Lists.newArrayList(Integer.parseInt(attemptId)))
+ .setStartKeyBase64(new String(Base64.encodeBase64(rangeParam.getStart())))
+ .setEndKeyBase64(new String(Base64.encodeBase64(rangeParam.getEnd())))
+ .setLastInclude(true);
+
+ URI uri = builder.build(true).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+
+ FileChunk chunk = fetcher.get().get(0);
+ assertNotNull(chunk);
+ assertNotNull(chunk.getFile());
+
+ FileStatus inStatus = fs.getFileStatus(inputPath);
+ FileStatus outStatus = fs.getFileStatus(new Path(chunk.getFile().getAbsolutePath()));
+
+ assertEquals(inStatus.getLen(), outStatus.getLen());
+ assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+ }
+
+ @Test
+ public void testAdjustFetchProcess() {
+ Assert.assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0);
+ assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0);
+ assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0);
+ assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0);
+ assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0);
+ assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0);
+ assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0);
+ }
+
+ @Test
+ public void testStatus() throws Exception {
+ Random rnd = new Random();
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+ final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+ .setTaskAttemptIds(Lists.newArrayList(ta));
+
+ FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(INPUT_DIR, dataPath), true);
+ for (int i = 0; i < 100; i++) {
+ String data = ""+rnd.nextInt();
+ stream.write(data.getBytes());
+ }
+ stream.flush();
+ stream.close();
+
+ URI uri = builder.build(true).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ fetcher.get();
+ assertEquals(FetcherState.FETCH_DATA_FINISHED, fetcher.getState());
+ }
+
+ @Test
+ public void testNoContentFetch() throws Exception {
+
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+ final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+ .setTaskAttemptIds(Lists.newArrayList(ta));
+
+ Path inputPath = new Path(INPUT_DIR, dataPath);
+ FileSystem fs = FileSystem.getLocal(conf);
+ if(fs.exists(inputPath)){
+ fs.delete(inputPath, true);
+ }
+
+ FSDataOutputStream stream = fs.create(inputPath, true);
+ stream.close();
+
+ URI uri = builder.build(true).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ try {
+ fetcher.get();
+ if (fetchType.equals(FetchType.LOCAL)) {
+ fail();
+ }
+ } catch (IOException e) {
+ if (fetchType.equals(FetchType.REMOTE)) {
+ fail();
+ }
+ }
+ assertEquals(FetcherState.FETCH_FAILED, fetcher.getState());
+ }
+
+ @Test
+ public void testFailureStatus() throws Exception {
+ Random rnd = new Random();
+
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId.toString(), sid);
+ final int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ final Path dataPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType("x") //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type
+ .setTaskAttemptIds(Lists.newArrayList(ta));
+
+ FSDataOutputStream stream = FileSystem.getLocal(conf).create(new Path(INPUT_DIR, dataPath), true);
+
+ for (int i = 0; i < 100; i++) {
+ String data = "" + rnd.nextInt();
+ stream.write(data.getBytes());
+ }
+ stream.flush();
+ stream.close();
+
+ URI uri = builder.build(true).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ try {
+ fetcher.get();
+ if (fetchType.equals(FetchType.LOCAL)) {
+ fail();
+ }
+ } catch (IllegalArgumentException e) {
+ if (!fetchType.equals(FetchType.LOCAL)) {
+ fail();
+ }
+ }
+ assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+ }
+
+ @Test
+ public void testServerFailure() throws Exception {
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ PullServerRequestURIBuilder builder = new PullServerRequestURIBuilder("127.0.0.1", pullserverPort,
+ maxUrlLength);
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(queryId.toString())
+ .setEbId(sid)
+ .setPartId(partId)
+ .setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+ .setTaskAttemptIds(Lists.newArrayList(ta));
+
+ URI uri = builder.build(true).get(0);
+ File data = new File(OUTPUT_DIR + "data");
+ final AbstractFetcher fetcher = getFetcher(uri, data);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ pullServerService.stop();
+
+ boolean failure = false;
+ try{
+ fetcher.get();
+ } catch (IOException e){
+ failure = true;
+ }
+ assertTrue(failure);
+ assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
index 45e430e..df5b3c8 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
@@ -305,7 +305,7 @@ public class TestTaskExecutor {
}
@Override
- public List<Fetcher> getFetchers() {
+ public List<AbstractFetcher> getFetchers() {
return null;
}
};
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 2a688e5..ba051a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -50,6 +50,8 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.querymaster.Task.PullHost;
import org.apache.tajo.storage.*;
@@ -70,7 +72,6 @@ import java.net.URLEncoder;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
@@ -1124,89 +1125,32 @@ public class Repartitioner {
}
public static List<URI> createFetchURL(int maxUrlLength, FetchProto fetch, boolean includeParts) {
- String scheme = "http://";
-
- StringBuilder urlPrefix = new StringBuilder(scheme);
+ PullServerRequestURIBuilder builder =
+ new PullServerRequestURIBuilder(fetch.getHost(), fetch.getPort(), maxUrlLength);
ExecutionBlockId ebId = new ExecutionBlockId(fetch.getExecutionBlockId());
- urlPrefix.append(fetch.getHost()).append(":").append(fetch.getPort()).append("/?")
- .append("qid=").append(ebId.getQueryId().toString())
- .append("&sid=").append(ebId.getId())
- .append("&p=").append(fetch.getPartitionId())
- .append("&type=");
+ builder.setRequestType(PullServerConstants.CHUNK_REQUEST_PARAM_STRING)
+ .setQueryId(ebId.getQueryId().toString())
+ .setEbId(ebId.getId())
+ .setPartId(fetch.getPartitionId());
+
if (fetch.getType() == HASH_SHUFFLE) {
- urlPrefix.append("h");
+ builder.setShuffleType(PullServerConstants.HASH_SHUFFLE_PARAM_STRING);
} else if (fetch.getType() == RANGE_SHUFFLE) {
- urlPrefix.append("r").append("&").append(getRangeParam(fetch));
+ builder.setShuffleType(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING);
+ builder.setStartKeyBase64(new String(org.apache.commons.codec.binary.Base64.encodeBase64(fetch.getRangeStart().toByteArray())));
+ builder.setEndKeyBase64(new String(org.apache.commons.codec.binary.Base64.encodeBase64(fetch.getRangeEnd().toByteArray())));
+ builder.setLastInclude(fetch.getRangeLastInclusive());
} else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) {
- urlPrefix.append("s");
+ builder.setShuffleType(PullServerConstants.SCATTERED_HASH_SHUFFLE_PARAM_STRING);
}
-
if (fetch.getLength() >= 0) {
- urlPrefix.append("&offset=").append(fetch.getOffset()).append("&length=").append(fetch.getLength());
+ builder.setOffset(fetch.getOffset()).setLength(fetch.getLength());
}
-
- List<URI> fetchURLs = new ArrayList<>();
- if(includeParts) {
- if (fetch.getType() == HASH_SHUFFLE || fetch.getType() == SCATTERED_HASH_SHUFFLE) {
- fetchURLs.add(URI.create(urlPrefix.toString()));
- } else {
- urlPrefix.append("&ta=");
- // If the get request is longer than 2000 characters,
- // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
- // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
- // The below code transforms a long request to multiple requests.
- List<String> taskIdsParams = new ArrayList<>();
- StringBuilder taskIdListBuilder = new StringBuilder();
-
- final List<Integer> taskIds = fetch.getTaskIdList();
- final List<Integer> attemptIds = fetch.getAttemptIdList();
-
- // Sort task ids to increase cache hit in pull server
- final List<Pair<Integer, Integer>> taskAndAttemptIds = IntStream.range(0, taskIds.size())
- .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
- .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
- .collect(Collectors.toList());
-
- boolean first = true;
-
- for (int i = 0; i < taskAndAttemptIds.size(); i++) {
- StringBuilder taskAttemptId = new StringBuilder();
-
- if (!first) { // when comma is added?
- taskAttemptId.append(",");
- } else {
- first = false;
- }
-
- int taskId = taskAndAttemptIds.get(i).getFirst();
- if (taskId < 0) {
- // In the case of hash shuffle each partition has single shuffle file per worker.
- // TODO If file is large, consider multiple fetching(shuffle file can be split)
- continue;
- }
- int attemptId = taskAndAttemptIds.get(i).getSecond();
- taskAttemptId.append(taskId).append("_").append(attemptId);
-
- if (urlPrefix.length() + taskIdListBuilder.length() > maxUrlLength) {
- taskIdsParams.add(taskIdListBuilder.toString());
- taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
- } else {
- taskIdListBuilder.append(taskAttemptId);
- }
- }
- // if the url params remain
- if (taskIdListBuilder.length() > 0) {
- taskIdsParams.add(taskIdListBuilder.toString());
- }
- for (String param : taskIdsParams) {
- fetchURLs.add(URI.create(urlPrefix + param));
- }
- }
- } else {
- fetchURLs.add(URI.create(urlPrefix.toString()));
+ if (includeParts) {
+ builder.setTaskIds(fetch.getTaskIdList());
+ builder.setAttemptIds(fetch.getAttemptIdList());
}
-
- return fetchURLs;
+ return builder.build(includeParts);
}
public static Map<Integer, List<IntermediateEntry>> hashByKey(List<IntermediateEntry> entries) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
new file mode 100644
index 0000000..a12db77
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractFetcher.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+public abstract class AbstractFetcher {
+
+ protected final URI uri;
+ protected FileChunk fileChunk;
+ protected final TajoConf conf;
+
+ protected TajoProtos.FetcherState state;
+
+ protected long startTime;
+ protected volatile long finishTime;
+ protected int fileNum;
+ protected long fileLen;
+ protected int messageReceiveCount;
+
+ public AbstractFetcher(TajoConf conf, URI uri) {
+ this(conf, uri, null);
+ }
+
+ public AbstractFetcher(TajoConf conf, URI uri, FileChunk fileChunk) {
+ this.conf = conf;
+ this.uri = uri;
+ this.fileChunk = fileChunk;
+ this.state = TajoProtos.FetcherState.FETCH_INIT;
+ }
+
+ public URI getURI() {
+ return this.uri;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public long getFileLen() {
+ return fileLen;
+ }
+
+ public int getFileNum() {
+ return fileNum;
+ }
+
+ public TajoProtos.FetcherState getState() {
+ return state;
+ }
+
+ public int getMessageReceiveCount() {
+ return messageReceiveCount;
+ }
+
+ public abstract List<FileChunk> get() throws IOException;
+
+ protected void endFetch(FetcherState state) {
+ this.finishTime = System.currentTimeMillis();
+ this.state = state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index e675d70..4ab6627 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -37,6 +37,7 @@ import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
+import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.ErrorUtil;
@@ -44,6 +45,7 @@ import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.pullserver.PullServerUtil;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -103,7 +105,7 @@ public class ExecutionBlockContext {
private final Map<TaskId, TaskHistory> taskHistories = Maps.newConcurrentMap();
public ExecutionBlockContext(TajoWorker.WorkerContext workerContext, ExecutionBlockContextResponse request,
- AsyncRpcClient queryMasterClient)
+ AsyncRpcClient queryMasterClient, @Nullable TajoPullServerService pullServerService)
throws IOException {
this.executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
this.connManager = RpcClientManager.getInstance();
@@ -117,7 +119,7 @@ public class ExecutionBlockContext {
this.queryEngine = new TajoQueryEngine(systemConf);
this.queryContext = new QueryContext(workerContext.getConf(), request.getQueryContext());
this.plan = request.getPlanJson();
- this.resource = new ExecutionBlockSharedResource();
+ this.resource = new ExecutionBlockSharedResource(pullServerService);
this.workerContext = workerContext;
this.shuffleType = request.getShuffleType();
this.queryMasterClient = queryMasterClient;
@@ -281,12 +283,12 @@ public class ExecutionBlockContext {
}
public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) {
- return TajoPullServerService.getBaseOutputDir(
+ return PullServerUtil.getBaseOutputDir(
executionBlockId.getQueryId().toString(), String.valueOf(executionBlockId.getId()));
}
public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
- return TajoPullServerService.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
+ return PullServerUtil.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString());
}
public ExecutionBlockId getExecutionBlockId() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
index 660f875..e1ff917 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.SessionVars;
+import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.codegen.ExecutorPreCompiler;
import org.apache.tajo.engine.codegen.TajoClassLoader;
@@ -35,8 +36,10 @@ import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.util.Pair;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
public class ExecutionBlockSharedResource {
@@ -53,6 +56,15 @@ public class ExecutionBlockSharedResource {
private ExecutorPreCompiler.CompilationContext compilationContext;
private LogicalNode plan;
private boolean codeGenEnabled = false;
+ private final TajoPullServerService pullServerService;
+
+ public ExecutionBlockSharedResource() {
+ this(null);
+ }
+
+ public ExecutionBlockSharedResource(@Nullable TajoPullServerService pullServerService) {
+ this.pullServerService = pullServerService;
+ }
public void initialize(final QueryContext context, final String planJson) {
@@ -130,6 +142,10 @@ public class ExecutionBlockSharedResource {
TableCache.getInstance().releaseCache(id);
}
+ public Optional<TajoPullServerService> getPullServerService() {
+ return pullServerService == null ? Optional.empty() : Optional.of(pullServerService);
+ }
+
public void release() {
compilationContext = null;
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
deleted file mode 100644
index 250b4cc..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.*;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http.*;
-import io.netty.handler.timeout.ReadTimeoutException;
-import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.ReferenceCountUtil;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoProtos.FetcherState;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.NettyUtils;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Fetcher fetches data from a given uri via HTTP protocol and stores them into
- * a specific file. It aims at asynchronous and efficient data transmit.
- */
-public class Fetcher {
-
- private final static Log LOG = LogFactory.getLog(Fetcher.class);
-
- private final URI uri;
- private final FileChunk fileChunk;
- private final TajoConf conf;
-
- private final String host;
- private int port;
- private final boolean useLocalFile;
-
- private long startTime;
- private volatile long finishTime;
- private long fileLen;
- private int messageReceiveCount;
- private TajoProtos.FetcherState state;
-
- private Bootstrap bootstrap;
- private List<Long> chunkLengths = new ArrayList<>();
-
- public Fetcher(TajoConf conf, URI uri, FileChunk chunk) {
- this.uri = uri;
- this.fileChunk = chunk;
- this.useLocalFile = !chunk.fromRemote();
- this.state = TajoProtos.FetcherState.FETCH_INIT;
- this.conf = conf;
-
- String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
- this.host = uri.getHost() == null ? "localhost" : uri.getHost();
- this.port = uri.getPort();
- if (port == -1) {
- if (scheme.equalsIgnoreCase("http")) {
- this.port = 80;
- } else if (scheme.equalsIgnoreCase("https")) {
- this.port = 443;
- }
- }
-
- if (!useLocalFile) {
- bootstrap = new Bootstrap()
- .group(
- NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
- conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
- .channel(NioSocketChannel.class)
- .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
- conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
- .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
- .option(ChannelOption.TCP_NODELAY, true);
- }
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public long getFinishTime() {
- return finishTime;
- }
-
- public long getFileLen() {
- return fileLen;
- }
-
- public TajoProtos.FetcherState getState() {
- return state;
- }
-
- public int getMessageReceiveCount() {
- return messageReceiveCount;
- }
-
- public List<FileChunk> get() throws IOException {
- List<FileChunk> fileChunks = new ArrayList<>();
- if (useLocalFile) {
- startTime = System.currentTimeMillis();
- finishTime = System.currentTimeMillis();
- state = TajoProtos.FetcherState.FETCH_FINISHED;
- fileChunks.add(fileChunk);
- fileLen = fileChunk.getFile().length();
- return fileChunks;
- }
-
- if (state == FetcherState.FETCH_INIT) {
- ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
- bootstrap.handler(initializer);
- }
-
- this.startTime = System.currentTimeMillis();
- this.state = TajoProtos.FetcherState.FETCH_FETCHING;
- ChannelFuture future = null;
- try {
- future = bootstrap.clone().connect(new InetSocketAddress(host, port))
- .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
-
- // Wait until the connection attempt succeeds or fails.
- Channel channel = future.awaitUninterruptibly().channel();
- if (!future.isSuccess()) {
- state = TajoProtos.FetcherState.FETCH_FAILED;
- throw new IOException(future.cause());
- }
-
- String query = uri.getPath()
- + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
- // Prepare the HTTP request.
- HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
- request.headers().set(HttpHeaders.Names.HOST, host);
- request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
- request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Status: " + getState() + ", URI:" + uri);
- }
- // Send the HTTP request.
- channel.writeAndFlush(request);
-
- // Wait for the server to close the connection. throw exception if failed
- channel.closeFuture().syncUninterruptibly();
-
- fileChunk.setLength(fileChunk.getFile().length());
-
- long start = 0;
- for (Long eachChunkLength : chunkLengths) {
- if (eachChunkLength == 0) continue;
- FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength);
- chunk.setEbId(fileChunk.getEbId());
- chunk.setFromRemote(fileChunk.fromRemote());
- fileChunks.add(chunk);
- start += eachChunkLength;
- }
- return fileChunks;
-
- } finally {
- if(future != null && future.channel().isOpen()){
- // Close the channel to exit.
- future.channel().close().awaitUninterruptibly();
- }
-
- this.finishTime = System.currentTimeMillis();
- long elapsedMills = finishTime - startTime;
- String transferSpeed;
- if(elapsedMills > 1000) {
- long bytePerSec = (fileChunk.length() * 1000) / elapsedMills;
- transferSpeed = FileUtils.byteCountToDisplaySize(bytePerSec);
- } else {
- transferSpeed = FileUtils.byteCountToDisplaySize(Math.max(fileChunk.length(), 0));
- }
-
- LOG.info(String.format("Fetcher :%d ms elapsed. %s/sec, len:%d, state:%s, URL:%s",
- elapsedMills, transferSpeed, fileChunk.length(), getState(), uri));
- }
- }
-
- public URI getURI() {
- return this.uri;
- }
-
- class HttpClientHandler extends ChannelInboundHandlerAdapter {
- private final File file;
- private RandomAccessFile raf;
- private FileChannel fc;
- private long length = -1;
-
- public HttpClientHandler(File file) throws FileNotFoundException {
- this.file = file;
- this.raf = new RandomAccessFile(file, "rw");
- this.fc = raf.getChannel();
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
-
- messageReceiveCount++;
- if (msg instanceof HttpResponse) {
- try {
- HttpResponse response = (HttpResponse) msg;
-
- StringBuilder sb = new StringBuilder();
- if (LOG.isDebugEnabled()) {
- sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
- .append(response.getProtocolVersion()).append(", HEADER: ");
- }
- if (!response.headers().names().isEmpty()) {
- for (String name : response.headers().names()) {
- for (String value : response.headers().getAll(name)) {
- if (LOG.isDebugEnabled()) {
- sb.append(name).append(" = ").append(value);
- }
- if (this.length == -1 && name.equals("Content-Length")) {
- this.length = Long.parseLong(value);
- }
- }
- }
- if (response.headers().contains(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME)) {
- String stringOffset = response.headers().get(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME);
-
- for (String eachSplit : stringOffset.split(",")) {
- chunkLengths.add(Long.parseLong(eachSplit));
- }
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(sb.toString());
- }
-
- if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
- LOG.warn("There are no data corresponding to the request");
- length = 0;
- return;
- } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
- LOG.error(response.getStatus().reasonPhrase());
- state = TajoProtos.FetcherState.FETCH_FAILED;
- return;
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
-
- if (msg instanceof HttpContent) {
- try {
- HttpContent httpContent = (HttpContent) msg;
- ByteBuf content = httpContent.content();
- if (content.isReadable()) {
- content.readBytes(fc, content.readableBytes());
- }
-
- if (msg instanceof LastHttpContent) {
- if (raf != null) {
- fileLen = file.length();
- }
-
- finishTime = System.currentTimeMillis();
- if (state != TajoProtos.FetcherState.FETCH_FAILED) {
- state = TajoProtos.FetcherState.FETCH_FINISHED;
- }
-
- IOUtils.cleanup(LOG, fc, raf);
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- if (cause instanceof ReadTimeoutException) {
- LOG.warn(cause.getMessage(), cause);
- } else {
- LOG.error("Fetch failed :", cause);
- }
-
- // this fetching will be retry
- IOUtils.cleanup(LOG, fc, raf);
- finishTime = System.currentTimeMillis();
- state = TajoProtos.FetcherState.FETCH_FAILED;
- ctx.close();
- }
-
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
- //channel is closed, but cannot complete fetcher
- finishTime = System.currentTimeMillis();
- LOG.error("Channel closed by peer: " + ctx.channel());
- state = TajoProtos.FetcherState.FETCH_FAILED;
- }
- IOUtils.cleanup(LOG, fc, raf);
-
- super.channelUnregistered(ctx);
- }
- }
-
- class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
- private final File file;
-
- public HttpClientChannelInitializer(File file) {
- this.file = file;
- }
-
- @Override
- protected void initChannel(Channel channel) throws Exception {
- ChannelPipeline pipeline = channel.pipeline();
-
- int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
- int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
-
- pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
- pipeline.addLast("inflater", new HttpContentDecompressor());
- pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
- pipeline.addLast("handler", new HttpClientHandler(file));
- }
- }
-}
[2/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service
of Yarn.
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
index c90f1aa..75f6080 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
@@ -18,22 +18,47 @@
package org.apache.tajo.pullserver;
-import org.apache.commons.lang.reflect.MethodUtils;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.LoadingCache;
+import com.google.gson.Gson;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.pullserver.PullServerConstants.Param;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.FileChunkMeta;
+import org.apache.tajo.pullserver.retriever.IndexCacheKey;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.util.Pair;
-import java.io.FileDescriptor;
-import java.lang.reflect.Method;
+import java.io.*;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class PullServerUtil {
private static final Log LOG = LogFactory.getLog(PullServerUtil.class);
private static boolean nativeIOPossible = false;
- private static Method posixFadviseIfPossible;
static {
- if (NativeIO.isAvailable() && loadNativeIO()) {
+ if (NativeIO.isAvailable()) {
nativeIOPossible = true;
} else {
LOG.warn("Unable to load hadoop nativeIO");
@@ -53,7 +78,7 @@ public class PullServerUtil {
long offset, long len, int flags) {
if (nativeIOPossible) {
try {
- posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags);
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, fd, offset, len, flags);
} catch (Throwable t) {
nativeIOPossible = false;
LOG.warn("Failed to manage OS cache for " + identifier, t);
@@ -61,30 +86,643 @@ public class PullServerUtil {
}
}
- /* load hadoop native method if possible */
- private static boolean loadNativeIO() {
- boolean loaded = true;
- if (nativeIOPossible) return loaded;
+ public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) {
+ return StorageUtil.concatPath(
+ queryId,
+ "output",
+ executionBlockSequenceId);
+ }
- Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE};
- try {
- Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]);
- Class posixClass;
- if (getCacheManipulator != null) {
- Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null);
- posixClass = posix.getClass();
+ public static Path getBaseInputDir(String queryId, String executionBlockId) {
+ return StorageUtil.concatPath(
+ queryId,
+ "in",
+ executionBlockId);
+ }
+
+ public static List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+
+ public static boolean isChunkRequest(String requestType) {
+ return requestType.equals(PullServerConstants.CHUNK_REQUEST_PARAM_STRING);
+ }
+
+ public static boolean isMetaRequest(String requestType) {
+ return requestType.equals(PullServerConstants.META_REQUEST_PARAM_STRING);
+ }
+
+ public static boolean isRangeShuffle(String shuffleType) {
+ return shuffleType.equals(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING);
+ }
+
+ public static boolean isHashShuffle(String shuffleType) {
+ return shuffleType.equals(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+ || shuffleType.equals(PullServerConstants.SCATTERED_HASH_SHUFFLE_PARAM_STRING);
+ }
+
+ public static class PullServerParams extends HashMap<String, List<String>> {
+
+ public PullServerParams(URI uri) {
+ this(uri.toString());
+ }
+
+ public PullServerParams(String uri) {
+ super(new QueryStringDecoder(uri).parameters());
+ }
+
+ public boolean contains(Param param) {
+ return containsKey(param.key());
+ }
+
+ public List<String> get(Param param) {
+ return get(param.key());
+ }
+
+ private String checkAndGetFirstParam(Param param) {
+ Preconditions.checkArgument(contains(param), "Missing " + param.name());
+ Preconditions.checkArgument(get(param).size() == 1, "Too many params: " + param.name());
+ return get(param).get(0);
+ }
+
+ private List<String> checkAndGet(Param param) {
+ Preconditions.checkArgument(contains(param), "Missing " + param.name());
+ return get(param);
+ }
+
+ public String requestType() {
+ return checkAndGetFirstParam(Param.REQUEST_TYPE);
+ }
+
+ public String shuffleType() {
+ return checkAndGetFirstParam(Param.SHUFFLE_TYPE);
+ }
+
+ public String queryId() {
+ return checkAndGetFirstParam(Param.QUERY_ID);
+ }
+
+ public String ebId() {
+ return checkAndGetFirstParam(Param.EB_ID);
+ }
+
+ public long offset() {
+ return contains(Param.OFFSET) && get(Param.OFFSET).size() == 1 ?
+ Long.parseLong(get(Param.OFFSET).get(0)) : -1L;
+ }
+
+ public long length() {
+ return contains(Param.LENGTH) && get(Param.LENGTH).size() == 1 ?
+ Long.parseLong(get(Param.LENGTH).get(0)) : -1L;
+ }
+
+ public String startKey() {
+ return checkAndGetFirstParam(Param.START);
+ }
+
+ public String endKey() {
+ return checkAndGetFirstParam(Param.END);
+ }
+
+ public boolean last() {
+ return contains(Param.FINAL);
+ }
+
+ public String partId() {
+ return checkAndGetFirstParam(Param.PART_ID);
+ }
+
+ public List<String> taskAttemptIds() {
+ return checkAndGet(Param.TASK_ID);
+ }
+ }
+
+ public static class PullServerRequestURIBuilder {
+ private final StringBuilder builder = new StringBuilder("http://");
+ private String requestType;
+ private String shuffleType;
+ private String queryId;
+ private Integer ebId;
+ private Integer partId;
+ private List<Integer> taskIds;
+ private List<Integer> attemptIds;
+ private List<String> taskAttemptIds;
+ private Long offset;
+ private Long length;
+ private String startKeyBase64;
+ private String endKeyBase64;
+ private boolean last;
+ private final int maxUrlLength;
+
+ public PullServerRequestURIBuilder(String pullServerAddr, int pullServerPort, int maxUrlLength) {
+ this(pullServerAddr, Integer.toString(pullServerPort), maxUrlLength);
+ }
+
+ public PullServerRequestURIBuilder(String pullServerAddr, String pullServerPort, int maxUrlLength) {
+ builder.append(pullServerAddr).append(":").append(pullServerPort).append("/?");
+ this.maxUrlLength = maxUrlLength;
+ }
+
+ public List<URI> build(boolean includeTasks) {
+ append(Param.REQUEST_TYPE, requestType)
+ .append(Param.QUERY_ID, queryId)
+ .append(Param.EB_ID, ebId)
+ .append(Param.PART_ID, partId)
+ .append(Param.SHUFFLE_TYPE, shuffleType);
+
+ if (startKeyBase64 != null) {
+
+ try {
+ append(Param.START, URLEncoder.encode(startKeyBase64, "utf-8"))
+ .append(Param.END, URLEncoder.encode(endKeyBase64, "utf-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (last) {
+ append(Param.FINAL, Boolean.toString(last));
+ }
+ }
+
+ if (length != null) {
+ append(Param.OFFSET, offset.toString())
+ .append(Param.LENGTH, length.toString());
+ }
+
+ List<URI> results = new ArrayList<>();
+ if (!includeTasks || isHashShuffle(shuffleType)) {
+ results.add(URI.create(builder.toString()));
} else {
- posixClass = NativeIO.POSIX.class;
+ builder.append(Param.TASK_ID.key()).append("=");
+ List<String> taskAttemptIds = this.taskAttemptIds;
+ if (taskAttemptIds == null) {
+
+ // Sort task ids to increase cache hit in pull server
+ taskAttemptIds = IntStream.range(0, taskIds.size())
+ .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
+ .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
+ // In the case of hash shuffle each partition has single shuffle file per worker.
+ // TODO If file is large, consider multiple fetching(shuffle file can be split)
+ .filter(pair -> pair.getFirst() >= 0)
+ .map(pair -> pair.getFirst() + "_" + pair.getSecond())
+ .collect(Collectors.toList());
+ }
+
+ // If the get request is longer than 2000 characters,
+ // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
+ // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
+ // The below code transforms a long request to multiple requests.
+ List<String> taskIdsParams = new ArrayList<>();
+ StringBuilder taskIdListBuilder = new StringBuilder();
+
+ boolean first = true;
+ for (int i = 0; i < taskAttemptIds.size(); i++) {
+ if (!first) {
+ taskIdListBuilder.append(",");
+ }
+ first = false;
+
+ if (builder.length() + taskIdListBuilder.length() > maxUrlLength) {
+ taskIdsParams.add(taskIdListBuilder.toString());
+ taskIdListBuilder = new StringBuilder(taskAttemptIds.get(i));
+ } else {
+ taskIdListBuilder.append(taskAttemptIds.get(i));
+ }
+ }
+ // if the url params remain
+ if (taskIdListBuilder.length() > 0) {
+ taskIdsParams.add(taskIdListBuilder.toString());
+ }
+ for (String param : taskIdsParams) {
+ results.add(URI.create(builder + param));
+ }
+ }
+
+ return results;
+ }
+
+ private PullServerRequestURIBuilder append(Param key, Object val) {
+ builder.append(key.key())
+ .append("=")
+ .append(val)
+ .append("&");
+
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setRequestType(String type) {
+ this.requestType = type;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setShuffleType(String shuffleType) {
+ this.shuffleType = shuffleType;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setQueryId(String queryId) {
+ this.queryId = queryId;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setEbId(String ebId) {
+ this.ebId = Integer.parseInt(ebId);
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setEbId(Integer ebId) {
+ this.ebId = ebId;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setPartId(String partId) {
+ this.partId = Integer.parseInt(partId);
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setPartId(Integer partId) {
+ this.partId = partId;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setTaskIds(List<Integer> taskIds) {
+ this.taskIds = taskIds;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setAttemptIds(List<Integer> attemptIds) {
+ this.attemptIds = attemptIds;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setTaskAttemptIds(List<String> taskAttemptIds) {
+ this.taskAttemptIds = taskAttemptIds;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setOffset(long offset) {
+ this.offset = offset;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setLength(long length) {
+ this.length = length;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setStartKeyBase64(String startKeyBase64) {
+ this.startKeyBase64 = startKeyBase64;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setEndKeyBase64(String endKeyBase64) {
+ this.endKeyBase64 = endKeyBase64;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setLastInclude(boolean last) {
+ this.last = last;
+ return this;
+ }
+ }
+
+ public static boolean useExternalPullServerService(TajoConf conf) {
+ // TODO: add more service types like mesos
+ return TajoPullServerService.isStandalone()
+ || conf.getBoolVar(ConfVars.YARN_SHUFFLE_SERVICE_ENABLED);
+ }
+
+ private static FileChunkMeta searchFileChunkMeta(String queryId,
+ String ebSeqId,
+ String taskId,
+ Path outDir,
+ String startKey,
+ String endKey,
+ boolean last,
+ LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+ int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+ SearchResult result = searchCorrespondPart(queryId, ebSeqId, outDir, startKey, endKey, last,
+ indexReaderCache, lowCacheHitCheckThreshold);
+ // Do not send file chunks of 0 length
+ if (result != null) {
+ long startOffset = result.startOffset;
+ long endOffset = result.endOffset;
+
+ FileChunkMeta chunk = new FileChunkMeta(startOffset, endOffset - startOffset, ebSeqId, taskId);
+
+ if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+ return chunk;
+ } else {
+ return null;
+ }
+ }
+
+ private static FileChunk searchFileChunk(String queryId,
+ String ebSeqId,
+ Path outDir,
+ String startKey,
+ String endKey,
+ boolean last,
+ LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+ int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+
+ final SearchResult result = searchCorrespondPart(queryId, ebSeqId, outDir, startKey, endKey, last,
+ indexReaderCache, lowCacheHitCheckThreshold);
+ if (result != null) {
+ long startOffset = result.startOffset;
+ long endOffset = result.endOffset;
+ FileChunk chunk = new FileChunk(result.data, startOffset, endOffset - startOffset);
+
+ if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+ return chunk;
+ } else {
+ return null;
+ }
+ }
+
+ private static class SearchResult {
+ File data;
+ long startOffset;
+ long endOffset;
+
+ public SearchResult(File data, long startOffset, long endOffset) {
+ this.data = data;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+ }
+
+ private static SearchResult searchCorrespondPart(String queryId,
+ String ebSeqId,
+ Path outDir,
+ String startKey,
+ String endKey,
+ boolean last,
+ LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+ int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+ BSTIndexReader idxReader = indexReaderCache.get(new IndexCacheKey(outDir, queryId, ebSeqId));
+ idxReader.retain();
+
+ File data;
+ long startOffset;
+ long endOffset;
+ try {
+ if (LOG.isDebugEnabled()) {
+ if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate() < 0.5) {
+ LOG.debug("Too low cache hit rate: " + indexReaderCache.stats());
+ }
+ }
+
+ Tuple indexedFirst = idxReader.getFirstKey();
+ Tuple indexedLast = idxReader.getLastKey();
+
+ if (indexedFirst == null && indexedLast == null) { // if # of rows is zero
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("There is no contents");
+ }
+ return null;
+ }
+
+ byte[] startBytes = Base64.decodeBase64(startKey);
+ byte[] endBytes = Base64.decodeBase64(endKey);
+
+
+ Tuple start;
+ Tuple end;
+ Schema keySchema = idxReader.getKeySchema();
+ RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+
+ try {
+ start = decoder.toTuple(startBytes);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException("StartKey: " + startKey
+ + ", decoded byte size: " + startBytes.length, t);
+ }
+
+ try {
+ end = decoder.toTuple(endBytes);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException("EndKey: " + endKey
+ + ", decoded byte size: " + endBytes.length, t);
}
- posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters);
- } catch (Throwable e) {
- loaded = false;
- LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage(), e);
+
+ data = new File(URI.create(outDir.toUri() + "/output"));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
+ (last ? ", last=true" : "") + ")");
+ }
+
+ TupleComparator comparator = idxReader.getComparator();
+
+ if (comparator.compare(end, indexedFirst) < 0 ||
+ comparator.compare(indexedLast, start) < 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast +
+ "], but request start:" + start + ", end: " + end);
+ }
+ return null;
+ }
+
+ try {
+ idxReader.init();
+ startOffset = idxReader.find(start);
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end + ")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ try {
+ endOffset = idxReader.find(end);
+ if (endOffset == -1) {
+ endOffset = idxReader.find(end, true);
+ }
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end + ")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+
+ // if startOffset == -1 then case 2-1 or case 3
+ if (startOffset == -1) { // this is a hack
+ // if case 2-1 or case 3
+ try {
+ startOffset = idxReader.find(start, true);
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end + ")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ }
+
+ if (startOffset == -1) {
+ throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+ "State Dump (the requested range: "
+ + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ }
+
+ // if greater than indexed values
+ if (last || (endOffset == -1
+ && comparator.compare(idxReader.getLastKey(), end) < 0)) {
+ endOffset = data.length();
+ }
+ } finally {
+ idxReader.release();
}
- if (posixFadviseIfPossible == null) {
- loaded = false;
+ return new SearchResult(data, startOffset, endOffset);
+ }
+
+ /**
+ * Retrieve meta information of file chunks which correspond to the requested URI.
+ * Only meta information for the file chunks which has non-zero length are retrieved.
+ *
+ * @param conf
+ * @param lDirAlloc
+ * @param localFS
+ * @param params
+ * @param gson
+ * @param indexReaderCache
+ * @param lowCacheHitCheckThreshold
+ * @return
+ * @throws IOException
+ * @throws ExecutionException
+ */
+ public static List<String> getJsonMeta(final TajoConf conf,
+ final LocalDirAllocator lDirAlloc,
+ final FileSystem localFS,
+ final PullServerParams params,
+ final Gson gson,
+ final LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+ final int lowCacheHitCheckThreshold)
+ throws IOException, ExecutionException {
+ final List<String> taskIds = PullServerUtil.splitMaps(params.taskAttemptIds());
+ final Path queryBaseDir = PullServerUtil.getBaseOutputDir(params.queryId(), params.ebId());
+ final List<String> jsonMetas = new ArrayList<>();
+
+ for (String eachTaskId : taskIds) {
+ Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+ if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+ LOG.warn("Range shuffle - file not exist. " + outputPath);
+ continue;
+ }
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
+ FileChunkMeta meta;
+ meta = PullServerUtil.searchFileChunkMeta(params.queryId(), params.ebId(), eachTaskId, path,
+ params.startKey(), params.endKey(), params.last(), indexReaderCache, lowCacheHitCheckThreshold);
+ if (meta != null && meta.getLength() > 0) {
+ String jsonStr = gson.toJson(meta, FileChunkMeta.class);
+ jsonMetas.add(jsonStr);
+ }
+ }
+ return jsonMetas;
+ }
+
+ /**
+ * Retrieve file chunks which correspond to the requested URI.
+ * Only the file chunks which has non-zero length are retrieved.
+ *
+ * @param conf
+ * @param lDirAlloc
+ * @param localFS
+ * @param params
+ * @param indexReaderCache
+ * @param lowCacheHitCheckThreshold
+ * @return
+ * @throws IOException
+ * @throws ExecutionException
+ */
+ public static List<FileChunk> getFileChunks(final TajoConf conf,
+ final LocalDirAllocator lDirAlloc,
+ final FileSystem localFS,
+ final PullServerParams params,
+ final LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+ final int lowCacheHitCheckThreshold)
+ throws IOException, ExecutionException {
+ final List<FileChunk> chunks = new ArrayList<>();
+
+ final String queryId = params.queryId();
+ final String shuffleType = params.shuffleType();
+ final String sid = params.ebId();
+
+ final long offset = params.offset();
+ final long length = params.length();
+
+ final Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId, sid);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid);
+
+ // the working dir of tajo worker for each query
+ LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+ }
+
+ // if a stage requires a range shuffle
+ if (PullServerUtil.isRangeShuffle(shuffleType)) {
+ final List<String> taskIdList = params.taskAttemptIds();
+ final List<String> taskIds = PullServerUtil.splitMaps(taskIdList);
+
+ final String startKey = params.startKey();
+ final String endKey = params.endKey();
+ final boolean last = params.last();
+
+ long before = System.currentTimeMillis();
+ for (String eachTaskId : taskIds) {
+ Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+ if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+ LOG.warn(outputPath + " does not exist.");
+ continue;
+ }
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
+
+ FileChunk chunk = PullServerUtil.searchFileChunk(queryId, sid, path, startKey, endKey, last, indexReaderCache,
+ lowCacheHitCheckThreshold);
+ if (chunk != null) {
+ chunks.add(chunk);
+ }
+ }
+ long after = System.currentTimeMillis();
+ LOG.info("Index lookup time: " + (after - before) + " ms");
+
+ // if a stage requires a hash shuffle or a scattered hash shuffle
+ } else if (PullServerUtil.isHashShuffle(shuffleType)) {
+
+ final String partId = params.partId();
+ int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+ if (!lDirAlloc.ifExists(partPath.toString(), conf)) {
+ throw new FileNotFoundException(partPath.toString());
+ }
+
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf));
+
+ File file = new File(path.toUri());
+ long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+ long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+ if (startPos >= file.length()) {
+ String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
+ throw new EOFException(errorMessage);
+ }
+ FileChunk chunk = new FileChunk(file, startPos, readLen);
+ chunks.add(chunk);
+ } else {
+ throw new IllegalArgumentException(shuffleType);
}
- return loaded;
+ return chunks.stream().filter(c -> c.length() > 0).collect(Collectors.toList());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
index 4609712..069e660 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
@@ -54,11 +54,6 @@ public class TajoPullServer extends CompositeService {
start();
}
- public void start() {
- super.start();
-
- }
-
public static void main(String[] args) throws Exception {
StringUtils.startupShutdownMessage(TajoPullServerService.PullServer.class, args, LOG);
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index cbeba52..aa16f87 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -22,8 +22,9 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
-import com.google.common.collect.Lists;
+import com.google.gson.Gson;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
@@ -31,12 +32,13 @@ import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
+import io.netty.handler.codec.http.HttpHeaders.Names;
+import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
-import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,42 +58,34 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.exception.InvalidURLException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.IndexCacheKey;
import org.apache.tajo.rpc.NettyUtils;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
-import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.TajoIdUtils;
import java.io.*;
import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class TajoPullServerService extends AbstractService {
private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
- public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
- public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
- public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
- public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
private int port;
private ServerBootstrap selector;
private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@@ -99,7 +93,6 @@ public class TajoPullServerService extends AbstractService {
private int sslFileBufferSize;
private int maxUrlLength;
- private ApplicationId appId;
private FileSystem localFS;
/**
@@ -110,62 +103,14 @@ public class TajoPullServerService extends AbstractService {
private int readaheadLength;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
- public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
-
- private static final Map<String,String> userRsrc =
- new ConcurrentHashMap<>();
- private String userName;
-
- private static LoadingCache<CacheKey, BSTIndexReader> indexReaderCache = null;
- private static int lowCacheHitCheckThreshold;
-
- public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
- "tajo.pullserver.ssl.file.buffer.size";
-
- public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+ private LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache = null;
+ private int lowCacheHitCheckThreshold;
private static final boolean STANDALONE;
- private static final AtomicIntegerFieldUpdater<ProcessingStatus> SLOW_FILE_UPDATER;
- private static final AtomicIntegerFieldUpdater<ProcessingStatus> REMAIN_FILE_UPDATER;
-
- public static final String CHUNK_LENGTH_HEADER_NAME = "c";
-
- static class CacheKey {
- private Path path;
- private String queryId;
- private String ebSeqId;
-
- public CacheKey(Path path, String queryId, String ebSeqId) {
- this.path = path;
- this.queryId = queryId;
- this.ebSeqId = ebSeqId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof CacheKey) {
- CacheKey other = (CacheKey) o;
- return Objects.equals(this.path, other.path)
- && Objects.equals(this.queryId, other.queryId)
- && Objects.equals(this.ebSeqId, other.ebSeqId);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(path, queryId, ebSeqId);
- }
- }
-
static {
- /* AtomicIntegerFieldUpdater can save the memory usage instead of AtomicInteger instance */
- SLOW_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "numSlowFile");
- REMAIN_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "remainFiles");
-
- String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
- STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase("true");
+ String standalone = System.getenv(PullServerConstants.PULLSERVER_STANDALONE_ENV_KEY);
+ STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase(Boolean.TRUE.toString());
}
@Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
@@ -193,7 +138,7 @@ public class TajoPullServerService extends AbstractService {
final ShuffleMetrics metrics;
TajoPullServerService(MetricsSystem ms) {
- super("httpshuffle");
+ super(PullServerConstants.PULLSERVER_SERVICE_NAME);
metrics = ms.register(new ShuffleMetrics());
}
@@ -202,58 +147,33 @@ public class TajoPullServerService extends AbstractService {
this(DefaultMetricsSystem.instance());
}
- public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
- // TODO these bytes should be versioned
- // TODO: Once SHuffle is out of NM, this can use MR APIs
- this.appId = appId;
- this.userName = user;
- userRsrc.put(appId.toString(), user);
- }
-
- public void stopApp(ApplicationId appId) {
- userRsrc.remove(appId.toString());
- }
-
+ // TODO change AbstractService to throw InterruptedException
@Override
- public void init(Configuration conf) {
- try {
- manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
- DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
- readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
- DEFAULT_SHUFFLE_READAHEAD_BYTES);
+ public void serviceInit(Configuration conf) throws Exception {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+ }
+ TajoConf tajoConf = (TajoConf) conf;
- int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
- Runtime.getRuntime().availableProcessors() * 2);
+ manageOsCache = tajoConf.getBoolean(PullServerConstants.SHUFFLE_MANAGE_OS_CACHE,
+ PullServerConstants.DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
- selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum)
- .option(ChannelOption.TCP_NODELAY, true)
- .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
- .childOption(ChannelOption.TCP_NODELAY, true);
+ readaheadLength = tajoConf.getInt(PullServerConstants.SHUFFLE_READAHEAD_BYTES,
+ PullServerConstants.DEFAULT_SHUFFLE_READAHEAD_BYTES);
- localFS = new LocalFileSystem();
+ int workerNum = tajoConf.getIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM);
- maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
- ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
+ selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+ .childOption(ChannelOption.TCP_NODELAY, true);
- conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
- , conf.getInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal));
- super.init(conf);
- LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
- } catch (Throwable t) {
- LOG.error(t, t);
- }
- }
+ localFS = new LocalFileSystem();
- // TODO change AbstractService to throw InterruptedException
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- if (!(conf instanceof TajoConf)) {
- throw new IllegalArgumentException("Configuration must be a TajoConf instance");
- }
+ maxUrlLength = tajoConf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+ LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
ServerBootstrap bootstrap = selector.clone();
- TajoConf tajoConf = (TajoConf)conf;
try {
channelInitializer = new HttpChannelInitializer(tajoConf);
} catch (Exception ex) {
@@ -263,7 +183,7 @@ public class TajoPullServerService extends AbstractService {
.channel(NioServerSocketChannel.class);
port = tajoConf.getIntVar(ConfVars.PULLSERVER_PORT);
- ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
+ ChannelFuture future = bootstrap.bind(port)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
.syncUninterruptibly();
@@ -272,8 +192,8 @@ public class TajoPullServerService extends AbstractService {
tajoConf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
LOG.info(getName() + " listening on port " + port);
- sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
- DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+ sslFileBufferSize = conf.getInt(PullServerConstants.SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+ PullServerConstants.DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
@@ -283,10 +203,10 @@ public class TajoPullServerService extends AbstractService {
.expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
.removalListener(removalListener)
.build(
- new CacheLoader<CacheKey, BSTIndexReader>() {
+ new CacheLoader<IndexCacheKey, BSTIndexReader>() {
@Override
- public BSTIndexReader load(CacheKey key) throws Exception {
- return new BSTIndex(tajoConf).getIndexReader(new Path(key.path, "index"));
+ public BSTIndexReader load(IndexCacheKey key) throws Exception {
+ return new BSTIndex(tajoConf).getIndexReader(new Path(key.getPath(), "index"));
}
}
);
@@ -353,29 +273,31 @@ public class TajoPullServerService extends AbstractService {
}
@Override
- public void stop() {
- try {
- accepted.close();
- if (selector != null) {
- if (selector.group() != null) {
- selector.group().shutdownGracefully();
- }
- if (selector.childGroup() != null) {
- selector.childGroup().shutdownGracefully();
- }
+ public void serviceStop() throws Exception {
+ accepted.close();
+ if (selector != null) {
+ if (selector.group() != null) {
+ selector.group().shutdownGracefully();
}
-
- if (channelInitializer != null) {
- channelInitializer.destroy();
+ if (selector.childGroup() != null) {
+ selector.childGroup().shutdownGracefully();
}
+ }
- localFS.close();
- indexReaderCache.invalidateAll();
- } catch (Throwable t) {
- LOG.error(t, t);
- } finally {
- super.stop();
+ if (channelInitializer != null) {
+ channelInitializer.destroy();
}
+
+ localFS.close();
+ indexReaderCache.invalidateAll();
+
+ super.serviceStop();
+ }
+
+ public List<FileChunk> getFileChunks(TajoConf conf, LocalDirAllocator lDirAlloc, PullServerParams params)
+ throws IOException, ExecutionException {
+ return PullServerUtil.getFileChunks(conf, lDirAlloc, localFS, params, indexReaderCache,
+ lowCacheHitCheckThreshold);
}
class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
@@ -385,8 +307,7 @@ public class TajoPullServerService extends AbstractService {
public HttpChannelInitializer(TajoConf conf) throws Exception {
PullServer = new PullServer(conf);
- if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
- ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
+ if (conf.getBoolVar(ConfVars.SHUFFLE_SSL_ENABLED_KEY)) {
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
sslFactory.init();
}
@@ -417,72 +338,13 @@ public class TajoPullServerService extends AbstractService {
}
}
-
- Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<>();
-
- public void completeFileChunk(FileRegion filePart,
- String requestUri,
- long startTime) {
- ProcessingStatus status = processingStatusMap.get(requestUri);
- if (status != null) {
- status.decrementRemainFiles(filePart, startTime);
- }
- }
-
- class ProcessingStatus {
- String requestUri;
- int numFiles;
- long startTime;
- long makeFileListTime;
- long minTime = Long.MAX_VALUE;
- long maxTime;
- volatile int numSlowFile;
- volatile int remainFiles;
-
- public ProcessingStatus(String requestUri) {
- this.requestUri = requestUri;
- this.startTime = System.currentTimeMillis();
- }
-
- public void setNumFiles(int numFiles) {
- this.numFiles = numFiles;
- this.remainFiles = numFiles;
- }
-
- public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
- long fileSendTime = System.currentTimeMillis() - fileStartTime;
-
- if (fileSendTime > maxTime) {
- maxTime = fileSendTime;
- }
- if (fileSendTime < minTime) {
- minTime = fileSendTime;
- }
-
- if (fileSendTime > 20 * 1000) {
- LOG.warn("Sending data takes too long. " + fileSendTime + "ms elapsed, " +
- "length:" + (filePart.count() - filePart.position()) + ", URI:" + requestUri);
- SLOW_FILE_UPDATER.compareAndSet(this, numSlowFile, numSlowFile + 1);
- }
-
- REMAIN_FILE_UPDATER.compareAndSet(this, remainFiles, remainFiles - 1);
- if (REMAIN_FILE_UPDATER.get(this) <= 0) {
- processingStatusMap.remove(requestUri);
- if(LOG.isDebugEnabled()) {
- LOG.debug("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, "
- + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, "
- + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
- }
- }
- }
- }
-
@ChannelHandler.Sharable
class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
private final TajoConf conf;
private final LocalDirAllocator lDirAlloc =
new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+ private final Gson gson = new Gson();
public PullServer(TajoConf conf) throws IOException {
this.conf = conf;
@@ -512,7 +374,7 @@ public class TajoPullServerService extends AbstractService {
}
if (request.getMethod() == HttpMethod.DELETE) {
- HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NO_CONTENT);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
clearIndexCache(request.getUri());
@@ -523,110 +385,117 @@ public class TajoPullServerService extends AbstractService {
}
// Parsing the URL into key-values
- Map<String, List<String>> params = null;
try {
- params = decodeParams(request.getUri());
+ final PullServerParams params = new PullServerParams(request.getUri());
+ if (PullServerUtil.isChunkRequest(params.requestType())) {
+ handleChunkRequest(ctx, request, params);
+ } else {
+ handleMetaRequest(ctx, request, params);
+ }
} catch (Throwable e) {
- LOG.error("Failed to decode uri " + request.getUri());
+ LOG.error("Failed to handle request " + request.getUri());
sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
return;
}
+ }
- ProcessingStatus processingStatus = new ProcessingStatus(request.getUri());
- processingStatusMap.put(request.getUri(), processingStatus);
-
- String partId = params.get("p").get(0);
- String queryId = params.get("qid").get(0);
- String shuffleType = params.get("type").get(0);
- String sid = params.get("sid").get(0);
-
- final List<String> taskIdList = params.get("ta");
- final List<String> offsetList = params.get("offset");
- final List<String> lengthList = params.get("length");
-
- long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
- long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
- List<String> taskIds = splitMaps(taskIdList);
-
- Path queryBaseDir = getBaseOutputDir(queryId, sid);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
- + ", taskIds=" + taskIdList);
-
- // the working dir of tajo worker for each query
- LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+ /**
+ * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
+ * It is called whenever an execution block is completed.
+ *
+ * @param uri query URI which indicates the execution block id
+ * @throws IOException
+ * @throws InvalidURLException
+ */
+ public void clearIndexCache(String uri)
+ throws IOException, InvalidURLException {
+ // Simply parse the given uri
+ String[] tokens = uri.split("=");
+ if (tokens.length != 2 || !tokens[0].equals("ebid")) {
+ throw new IllegalArgumentException("invalid params: " + uri);
}
-
- final List<FileChunk> chunks = Lists.newArrayList();
-
- // if a stage requires a range shuffle
- if (shuffleType.equals("r")) {
- final String startKey = params.get("start").get(0);
- final String endKey = params.get("end").get(0);
- final boolean last = params.get("final") != null;
-
- long before = System.currentTimeMillis();
- for (String eachTaskId : taskIds) {
- Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
- if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
- LOG.warn(outputPath + "does not exist.");
- continue;
- }
- Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
-
- FileChunk chunk;
- try {
- chunk = getFileChunks(queryId, sid, path, startKey, endKey, last);
- } catch (Throwable t) {
- LOG.error("ERROR Request: " + request.getUri(), t);
- sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
- return;
+ ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
+ String queryId = ebId.getQueryId().toString();
+ String ebSeqId = Integer.toString(ebId.getId());
+ List<IndexCacheKey> removed = new ArrayList<>();
+ synchronized (indexReaderCache) {
+ for (Entry<IndexCacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
+ IndexCacheKey key = e.getKey();
+ if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+ e.getValue().forceClose();
+ removed.add(e.getKey());
}
- if (chunk != null) {
- chunks.add(chunk);
+ }
+ indexReaderCache.invalidateAll(removed);
+ }
+ removed.clear();
+ synchronized (waitForRemove) {
+ for (Entry<IndexCacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
+ IndexCacheKey key = e.getKey();
+ if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+ e.getValue().forceClose();
+ removed.add(e.getKey());
}
}
- long after = System.currentTimeMillis();
- LOG.info("Index lookup time: " + (after - before) + " ms");
-
- // if a stage requires a hash shuffle or a scattered hash shuffle
- } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
- int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
- Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
- if (!lDirAlloc.ifExists(partPath.toString(), conf)) {
- LOG.warn("Partition shuffle file not exists: " + partPath);
- sendError(ctx, HttpResponseStatus.NO_CONTENT);
- return;
+ for (IndexCacheKey eachKey : removed) {
+ waitForRemove.remove(eachKey);
}
+ }
+ }
+
+ private void handleMetaRequest(ChannelHandlerContext ctx, FullHttpRequest request, final PullServerParams params)
+ throws IOException, ExecutionException {
+ final List<String> jsonMetas;
+ try {
+ jsonMetas = PullServerUtil.getJsonMeta(conf, lDirAlloc, localFS, params, gson, indexReaderCache,
+ lowCacheHitCheckThreshold);
+ } catch (FileNotFoundException e) {
+ sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+ return;
+ } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+ sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+ return;
+ } catch (ExecutionException e) {
+ // There are some problems in index cache
+ throw new TajoInternalError(e.getCause());
+ }
- Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf));
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.OK,
+ Unpooled.copiedBuffer(gson.toJson(jsonMetas), CharsetUtil.UTF_8));
+ response.headers().set(Names.CONTENT_TYPE, "application/json; charset=UTF-8");
+ HttpHeaders.setContentLength(response, response.content().readableBytes());
+ if (HttpHeaders.isKeepAlive(request)) {
+ response.headers().set(Names.CONNECTION, Values.KEEP_ALIVE);
+ }
+ ChannelFuture writeFuture = ctx.writeAndFlush(response);
- File file = new File(path.toUri());
- long startPos = (offset >= 0 && length >= 0) ? offset : 0;
- long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+ // Decide whether to close the connection or not.
+ if (!HttpHeaders.isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ writeFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
- if (startPos >= file.length()) {
- String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
- LOG.error(errorMessage);
- sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST);
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
- }
- FileChunk chunk = new FileChunk(file, startPos, readLen);
- chunks.add(chunk);
- } else {
- LOG.error("Unknown shuffle type: " + shuffleType);
- sendError(ctx, "Unknown shuffle type:" + shuffleType, HttpResponseStatus.BAD_REQUEST);
+ private void handleChunkRequest(ChannelHandlerContext ctx, FullHttpRequest request, final PullServerParams params)
+ throws IOException {
+ final List<FileChunk> chunks;
+ try {
+ chunks = PullServerUtil.getFileChunks(conf, lDirAlloc, localFS, params, indexReaderCache,
+ lowCacheHitCheckThreshold);
+ } catch (FileNotFoundException e) {
+ sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+ return;
+ } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+ sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
return;
+ } catch (ExecutionException e) {
+ // There are some problems in index cache
+ throw new TajoInternalError(e.getCause());
}
// Write the content.
if (chunks.size() == 0) {
- HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NO_CONTENT);
if (!HttpHeaders.isKeepAlive(request)) {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
@@ -637,7 +506,7 @@ public class TajoPullServerService extends AbstractService {
} else {
FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
ChannelFuture writeFuture = null;
- HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
long totalSize = 0;
StringBuilder sb = new StringBuilder();
for (FileChunk chunk : file) {
@@ -645,7 +514,7 @@ public class TajoPullServerService extends AbstractService {
sb.append(Long.toString(chunk.length())).append(",");
}
sb.deleteCharAt(sb.length() - 1);
- HttpHeaders.addHeader(response, CHUNK_LENGTH_HEADER_NAME, sb.toString());
+ HttpHeaders.addHeader(response, PullServerConstants.CHUNK_LENGTH_HEADER_NAME, sb.toString());
HttpHeaders.setContentLength(response, totalSize);
if (HttpHeaders.isKeepAlive(request)) {
@@ -655,7 +524,7 @@ public class TajoPullServerService extends AbstractService {
writeFuture = ctx.write(response);
for (FileChunk chunk : file) {
- writeFuture = sendFile(ctx, chunk, request.getUri());
+ writeFuture = sendFile(ctx, chunk);
if (writeFuture == null) {
sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
@@ -676,53 +545,8 @@ public class TajoPullServerService extends AbstractService {
}
}
- /**
- * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
- * It is called whenever an execution block is completed.
- *
- * @param uri query URI which indicates the execution block id
- * @throws IOException
- * @throws InvalidURLException
- */
- private void clearIndexCache(String uri) throws IOException, InvalidURLException {
- // Simply parse the given uri
- String[] tokens = uri.split("=");
- if (tokens.length != 2 || !tokens[0].equals("ebid")) {
- throw new IllegalArgumentException("invalid params: " + uri);
- }
- ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
- String queryId = ebId.getQueryId().toString();
- String ebSeqId = Integer.toString(ebId.getId());
- List<CacheKey> removed = new ArrayList<>();
- synchronized (indexReaderCache) {
- for (Entry<CacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
- CacheKey key = e.getKey();
- if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
- e.getValue().forceClose();
- removed.add(e.getKey());
- }
- }
- indexReaderCache.invalidateAll(removed);
- }
- removed.clear();
- synchronized (waitForRemove) {
- for (Entry<CacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
- CacheKey key = e.getKey();
- if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
- e.getValue().forceClose();
- removed.add(e.getKey());
- }
- }
- for (CacheKey eachKey : removed) {
- waitForRemove.remove(eachKey);
- }
- }
- }
-
private ChannelFuture sendFile(ChannelHandlerContext ctx,
- FileChunk file,
- String requestUri) throws IOException {
- long startTime = System.currentTimeMillis();
+ FileChunk file) throws IOException {
RandomAccessFile spill = null;
ChannelFuture writeFuture;
try {
@@ -732,7 +556,7 @@ public class TajoPullServerService extends AbstractService {
file.startOffset(), file.length(), manageOsCache, readaheadLength,
readaheadPool, file.getFile().getAbsolutePath());
writeFuture = ctx.write(filePart);
- writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
+ writeFuture.addListener(new FileCloseListener(filePart));
} else {
// HTTPS cannot be done with zero copy.
final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
@@ -763,9 +587,10 @@ public class TajoPullServerService extends AbstractService {
private void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
- Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
+ ByteBuf content = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, content);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
+ HttpHeaders.setContentLength(response, content.writerIndex());
// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
@@ -782,12 +607,12 @@ public class TajoPullServerService extends AbstractService {
}
// Temporal space to wait for the completion of all index lookup operations
- private static final ConcurrentHashMap<CacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<IndexCacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
// RemovalListener is triggered when an item is removed from the index reader cache.
// It closes index readers when they are not used anymore.
// If they are still being used, they are moved to waitForRemove map to wait for other operations' completion.
- private static final RemovalListener<CacheKey, BSTIndexReader> removalListener = (removal) -> {
+ private final RemovalListener<IndexCacheKey, BSTIndexReader> removalListener = (removal) -> {
BSTIndexReader reader = removal.getValue();
if (reader.getReferenceNum() == 0) {
try {
@@ -800,180 +625,4 @@ public class TajoPullServerService extends AbstractService {
waitForRemove.put(removal.getKey(), reader);
}
};
-
- public static FileChunk getFileChunks(String queryId,
- String ebSeqId,
- Path outDir,
- String startKey,
- String endKey,
- boolean last) throws IOException, ExecutionException {
-
- BSTIndexReader idxReader = indexReaderCache.get(new CacheKey(outDir, queryId, ebSeqId));
- idxReader.retain();
-
- File data;
- long startOffset;
- long endOffset;
- try {
- if (LOG.isDebugEnabled()) {
- if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate() < 0.5) {
- LOG.debug("Too low cache hit rate: " + indexReaderCache.stats());
- }
- }
-
- Tuple indexedFirst = idxReader.getFirstKey();
- Tuple indexedLast = idxReader.getLastKey();
-
- if (indexedFirst == null && indexedLast == null) { // if # of rows is zero
- if (LOG.isDebugEnabled()) {
- LOG.debug("There is no contents");
- }
- return null;
- }
-
- byte[] startBytes = Base64.decodeBase64(startKey);
- byte[] endBytes = Base64.decodeBase64(endKey);
-
-
- Tuple start;
- Tuple end;
- Schema keySchema = idxReader.getKeySchema();
- RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
-
- try {
- start = decoder.toTuple(startBytes);
- } catch (Throwable t) {
- throw new IllegalArgumentException("StartKey: " + startKey
- + ", decoded byte size: " + startBytes.length, t);
- }
-
- try {
- end = decoder.toTuple(endBytes);
- } catch (Throwable t) {
- throw new IllegalArgumentException("EndKey: " + endKey
- + ", decoded byte size: " + endBytes.length, t);
- }
-
- data = new File(URI.create(outDir.toUri() + "/output"));
- if (LOG.isDebugEnabled()) {
- LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
- (last ? ", last=true" : "") + ")");
- }
-
- TupleComparator comparator = idxReader.getComparator();
-
- if (comparator.compare(end, indexedFirst) < 0 ||
- comparator.compare(indexedLast, start) < 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast +
- "], but request start:" + start + ", end: " + end);
- }
- return null;
- }
-
- try {
- idxReader.init();
- startOffset = idxReader.find(start);
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end + ")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- try {
- endOffset = idxReader.find(end);
- if (endOffset == -1) {
- endOffset = idxReader.find(end, true);
- }
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end + ")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
-
- // if startOffset == -1 then case 2-1 or case 3
- if (startOffset == -1) { // this is a hack
- // if case 2-1 or case 3
- try {
- startOffset = idxReader.find(start, true);
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end + ")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- }
-
- if (startOffset == -1) {
- throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
- "State Dump (the requested range: "
- + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- }
-
- // if greater than indexed values
- if (last || (endOffset == -1
- && comparator.compare(idxReader.getLastKey(), end) < 0)) {
- endOffset = data.length();
- }
- } finally {
- idxReader.release();
- }
-
- FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
-
- if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
- return chunk;
- }
-
- public static List<String> splitMaps(List<String> mapq) {
- if (null == mapq) {
- return null;
- }
- final List<String> ret = new ArrayList<>();
- for (String s : mapq) {
- Collections.addAll(ret, s.split(","));
- }
- return ret;
- }
-
- public static Map<String, List<String>> decodeParams(String uri) {
- final Map<String, List<String>> params = new QueryStringDecoder(uri).parameters();
- final List<String> types = params.get("type");
- final List<String> qids = params.get("qid");
- final List<String> ebIds = params.get("sid");
- final List<String> partIds = params.get("p");
-
- if (types == null || ebIds == null || qids == null || partIds == null) {
- throw new IllegalArgumentException("invalid params. required :" + params);
- }
-
- if (qids.size() != 1 && types.size() != 1 || ebIds.size() != 1) {
- throw new IllegalArgumentException("invalid params. required :" + params);
- }
-
- return params;
- }
-
- public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) {
- Path workDir =
- StorageUtil.concatPath(
- queryId,
- "output",
- executionBlockSequenceId);
- return workDir;
- }
-
- public static Path getBaseInputDir(String queryId, String executionBlockId) {
- Path workDir =
- StorageUtil.concatPath(
- queryId,
- "in",
- executionBlockId);
- return workDir;
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
index 67cff21..c5f6a6a 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
@@ -36,7 +36,7 @@ public class FileChunk {
*/
private String ebId;
- public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
+ public FileChunk(File file, long startOffset, long length) {
this.file = file;
this.startOffset = startOffset;
this.length = length;
@@ -76,6 +76,6 @@ public class FileChunk {
public String toString() {
return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") "
- + file.getAbsolutePath();
+ + file.getAbsolutePath();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
new file mode 100644
index 0000000..3f6b3eb
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+public class FileChunkMeta {
+ private final long startOffset;
+ private final long length;
+ private final String ebId;
+ private final String taskId;
+
+ public FileChunkMeta(long startOffset, long length, String ebId, String taskId) {
+ this.startOffset = startOffset;
+ this.length = length;
+ this.ebId = ebId;
+ this.taskId = taskId;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public String getEbId() {
+ return ebId;
+ }
+
+ public String toString() {
+ return "ebId: " + ebId + ", taskId: " + taskId + " (" + startOffset + ", " + length + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
new file mode 100644
index 0000000..2a71c65
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Objects;
+
+public class IndexCacheKey {
+ private Path path;
+ private String queryId;
+ private String ebSeqId;
+
+ public IndexCacheKey(Path path, String queryId, String ebSeqId) {
+ this.path = path;
+ this.queryId = queryId;
+ this.ebSeqId = ebSeqId;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public String getEbSeqId() {
+ return ebSeqId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof IndexCacheKey) {
+ IndexCacheKey other = (IndexCacheKey) o;
+ return Objects.equals(this.path, other.path)
+ && Objects.equals(this.queryId, other.queryId)
+ && Objects.equals(this.ebSeqId, other.ebSeqId);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path, queryId, ebSeqId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-yarn/pom.xml b/tajo-yarn/pom.xml
new file mode 100644
index 0000000..70511a1
--- /dev/null
+++ b/tajo-yarn/pom.xml
@@ -0,0 +1,265 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>tajo-project</artifactId>
+ <groupId>org.apache.tajo</groupId>
+ <version>0.12.0-SNAPSHOT</version>
+ <relativePath>../tajo-project</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>tajo-yarn</artifactId>
+ <packaging>jar</packaging>
+ <name>Tajo Yarn</name>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.iq80.snappy</groupId>
+ <artifactId>snappy</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>net.minidev</groupId>
+ <artifactId>json-smart</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-plan</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-hdfs</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-storage-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-orc</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop-bundle</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>trevni-avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>trevni-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-pullserver</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-rpc-protobuf</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <profiles>
+ <profile>
+ <id>docs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <execution>
+ <!-- build javadoc jars per jar for publishing to maven -->
+ <id>module-javadocs</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <destDir>${project.build.directory}</destDir>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>src</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <executions>
+ <execution>
+ <!-- builds source jars and attaches them to the project for publishing -->
+ <id>tajo-java-sources</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </reporting>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
new file mode 100644
index 0000000..3c0a76f
--- /dev/null
+++ b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.yarn;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+ private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final ReadaheadPool readaheadPool;
+ private final FileDescriptor fd;
+ private final String identifier;
+
+ private ReadaheadPool.ReadaheadRequest readaheadRequest;
+
+ public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+ int chunkSize, boolean manageOsCache, int readaheadLength,
+ ReadaheadPool readaheadPool, String identifier) throws IOException {
+ super(file, position, count, chunkSize);
+ this.manageOsCache = manageOsCache;
+ this.readaheadLength = readaheadLength;
+ this.readaheadPool = readaheadPool;
+ this.fd = file.getFD();
+ this.identifier = identifier;
+ }
+
+ @Override
+ public Object nextChunk() throws Exception {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+ readaheadRequest = readaheadPool
+ .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+ getEndOffset(), readaheadRequest);
+ }
+ return super.nextChunk();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
+ try {
+ PullServerUtil.posixFadviseIfPossible(identifier,
+ fd,
+ getStartOffset(), getEndOffset() - getStartOffset(),
+ NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ } catch (Throwable t) {
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ super.close();
+ }
+}