You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/04/27 06:27:09 UTC
[2/4] tajo git commit: TAJO-2122: PullServer as an Auxiliary service
of Yarn.
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
index c90f1aa..75f6080 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerUtil.java
@@ -18,22 +18,47 @@
package org.apache.tajo.pullserver;
-import org.apache.commons.lang.reflect.MethodUtils;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.LoadingCache;
+import com.google.gson.Gson;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.pullserver.PullServerConstants.Param;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.FileChunkMeta;
+import org.apache.tajo.pullserver.retriever.IndexCacheKey;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.util.Pair;
-import java.io.FileDescriptor;
-import java.lang.reflect.Method;
+import java.io.*;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class PullServerUtil {
private static final Log LOG = LogFactory.getLog(PullServerUtil.class);
private static boolean nativeIOPossible = false;
- private static Method posixFadviseIfPossible;
static {
- if (NativeIO.isAvailable() && loadNativeIO()) {
+ if (NativeIO.isAvailable()) {
nativeIOPossible = true;
} else {
LOG.warn("Unable to load hadoop nativeIO");
@@ -53,7 +78,7 @@ public class PullServerUtil {
long offset, long len, int flags) {
if (nativeIOPossible) {
try {
- posixFadviseIfPossible.invoke(null, identifier, fd, offset, len, flags);
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, fd, offset, len, flags);
} catch (Throwable t) {
nativeIOPossible = false;
LOG.warn("Failed to manage OS cache for " + identifier, t);
@@ -61,30 +86,643 @@ public class PullServerUtil {
}
}
- /* load hadoop native method if possible */
- private static boolean loadNativeIO() {
- boolean loaded = true;
- if (nativeIOPossible) return loaded;
+ public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) {
+ return StorageUtil.concatPath(
+ queryId,
+ "output",
+ executionBlockSequenceId);
+ }
- Class[] parameters = {String.class, FileDescriptor.class, Long.TYPE, Long.TYPE, Integer.TYPE};
- try {
- Method getCacheManipulator = MethodUtils.getAccessibleMethod(NativeIO.POSIX.class, "getCacheManipulator", new Class[0]);
- Class posixClass;
- if (getCacheManipulator != null) {
- Object posix = MethodUtils.invokeStaticMethod(NativeIO.POSIX.class, "getCacheManipulator", null);
- posixClass = posix.getClass();
+ public static Path getBaseInputDir(String queryId, String executionBlockId) {
+ return StorageUtil.concatPath(
+ queryId,
+ "in",
+ executionBlockId);
+ }
+
+ public static List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+
+ public static boolean isChunkRequest(String requestType) {
+ return requestType.equals(PullServerConstants.CHUNK_REQUEST_PARAM_STRING);
+ }
+
+ public static boolean isMetaRequest(String requestType) {
+ return requestType.equals(PullServerConstants.META_REQUEST_PARAM_STRING);
+ }
+
+ public static boolean isRangeShuffle(String shuffleType) {
+ return shuffleType.equals(PullServerConstants.RANGE_SHUFFLE_PARAM_STRING);
+ }
+
+ public static boolean isHashShuffle(String shuffleType) {
+ return shuffleType.equals(PullServerConstants.HASH_SHUFFLE_PARAM_STRING)
+ || shuffleType.equals(PullServerConstants.SCATTERED_HASH_SHUFFLE_PARAM_STRING);
+ }
+
+ public static class PullServerParams extends HashMap<String, List<String>> {
+
+ public PullServerParams(URI uri) {
+ this(uri.toString());
+ }
+
+ public PullServerParams(String uri) {
+ super(new QueryStringDecoder(uri).parameters());
+ }
+
+ public boolean contains(Param param) {
+ return containsKey(param.key());
+ }
+
+ public List<String> get(Param param) {
+ return get(param.key());
+ }
+
+ private String checkAndGetFirstParam(Param param) {
+ Preconditions.checkArgument(contains(param), "Missing " + param.name());
+ Preconditions.checkArgument(get(param).size() == 1, "Too many params: " + param.name());
+ return get(param).get(0);
+ }
+
+ private List<String> checkAndGet(Param param) {
+ Preconditions.checkArgument(contains(param), "Missing " + param.name());
+ return get(param);
+ }
+
+ public String requestType() {
+ return checkAndGetFirstParam(Param.REQUEST_TYPE);
+ }
+
+ public String shuffleType() {
+ return checkAndGetFirstParam(Param.SHUFFLE_TYPE);
+ }
+
+ public String queryId() {
+ return checkAndGetFirstParam(Param.QUERY_ID);
+ }
+
+ public String ebId() {
+ return checkAndGetFirstParam(Param.EB_ID);
+ }
+
+ public long offset() {
+ return contains(Param.OFFSET) && get(Param.OFFSET).size() == 1 ?
+ Long.parseLong(get(Param.OFFSET).get(0)) : -1L;
+ }
+
+ public long length() {
+ return contains(Param.LENGTH) && get(Param.LENGTH).size() == 1 ?
+ Long.parseLong(get(Param.LENGTH).get(0)) : -1L;
+ }
+
+ public String startKey() {
+ return checkAndGetFirstParam(Param.START);
+ }
+
+ public String endKey() {
+ return checkAndGetFirstParam(Param.END);
+ }
+
+ public boolean last() {
+ return contains(Param.FINAL);
+ }
+
+ public String partId() {
+ return checkAndGetFirstParam(Param.PART_ID);
+ }
+
+ public List<String> taskAttemptIds() {
+ return checkAndGet(Param.TASK_ID);
+ }
+ }
+
+ public static class PullServerRequestURIBuilder {
+ private final StringBuilder builder = new StringBuilder("http://");
+ private String requestType;
+ private String shuffleType;
+ private String queryId;
+ private Integer ebId;
+ private Integer partId;
+ private List<Integer> taskIds;
+ private List<Integer> attemptIds;
+ private List<String> taskAttemptIds;
+ private Long offset;
+ private Long length;
+ private String startKeyBase64;
+ private String endKeyBase64;
+ private boolean last;
+ private final int maxUrlLength;
+
+ public PullServerRequestURIBuilder(String pullServerAddr, int pullServerPort, int maxUrlLength) {
+ this(pullServerAddr, Integer.toString(pullServerPort), maxUrlLength);
+ }
+
+ public PullServerRequestURIBuilder(String pullServerAddr, String pullServerPort, int maxUrlLength) {
+ builder.append(pullServerAddr).append(":").append(pullServerPort).append("/?");
+ this.maxUrlLength = maxUrlLength;
+ }
+
+ public List<URI> build(boolean includeTasks) {
+ append(Param.REQUEST_TYPE, requestType)
+ .append(Param.QUERY_ID, queryId)
+ .append(Param.EB_ID, ebId)
+ .append(Param.PART_ID, partId)
+ .append(Param.SHUFFLE_TYPE, shuffleType);
+
+ if (startKeyBase64 != null) {
+
+ try {
+ append(Param.START, URLEncoder.encode(startKeyBase64, "utf-8"))
+ .append(Param.END, URLEncoder.encode(endKeyBase64, "utf-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (last) {
+ append(Param.FINAL, Boolean.toString(last));
+ }
+ }
+
+ if (length != null) {
+ append(Param.OFFSET, offset.toString())
+ .append(Param.LENGTH, length.toString());
+ }
+
+ List<URI> results = new ArrayList<>();
+ if (!includeTasks || isHashShuffle(shuffleType)) {
+ results.add(URI.create(builder.toString()));
} else {
- posixClass = NativeIO.POSIX.class;
+ builder.append(Param.TASK_ID.key()).append("=");
+ List<String> taskAttemptIds = this.taskAttemptIds;
+ if (taskAttemptIds == null) {
+
+ // Sort task ids to increase cache hit in pull server
+ taskAttemptIds = IntStream.range(0, taskIds.size())
+ .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
+ .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
+ // In the case of hash shuffle each partition has single shuffle file per worker.
+ // TODO If file is large, consider multiple fetching(shuffle file can be split)
+ .filter(pair -> pair.getFirst() >= 0)
+ .map(pair -> pair.getFirst() + "_" + pair.getSecond())
+ .collect(Collectors.toList());
+ }
+
+ // If the get request is longer than 2000 characters,
+ // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
+ // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
+ // The below code transforms a long request to multiple requests.
+ List<String> taskIdsParams = new ArrayList<>();
+ StringBuilder taskIdListBuilder = new StringBuilder();
+
+ boolean first = true;
+ for (int i = 0; i < taskAttemptIds.size(); i++) {
+ if (!first) {
+ taskIdListBuilder.append(",");
+ }
+ first = false;
+
+ if (builder.length() + taskIdListBuilder.length() > maxUrlLength) {
+ taskIdsParams.add(taskIdListBuilder.toString());
+ taskIdListBuilder = new StringBuilder(taskAttemptIds.get(i));
+ } else {
+ taskIdListBuilder.append(taskAttemptIds.get(i));
+ }
+ }
+ // if the url params remain
+ if (taskIdListBuilder.length() > 0) {
+ taskIdsParams.add(taskIdListBuilder.toString());
+ }
+ for (String param : taskIdsParams) {
+ results.add(URI.create(builder + param));
+ }
+ }
+
+ return results;
+ }
+
+ private PullServerRequestURIBuilder append(Param key, Object val) {
+ builder.append(key.key())
+ .append("=")
+ .append(val)
+ .append("&");
+
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setRequestType(String type) {
+ this.requestType = type;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setShuffleType(String shuffleType) {
+ this.shuffleType = shuffleType;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setQueryId(String queryId) {
+ this.queryId = queryId;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setEbId(String ebId) {
+ this.ebId = Integer.parseInt(ebId);
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setEbId(Integer ebId) {
+ this.ebId = ebId;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setPartId(String partId) {
+ this.partId = Integer.parseInt(partId);
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setPartId(Integer partId) {
+ this.partId = partId;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setTaskIds(List<Integer> taskIds) {
+ this.taskIds = taskIds;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setAttemptIds(List<Integer> attemptIds) {
+ this.attemptIds = attemptIds;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setTaskAttemptIds(List<String> taskAttemptIds) {
+ this.taskAttemptIds = taskAttemptIds;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setOffset(long offset) {
+ this.offset = offset;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setLength(long length) {
+ this.length = length;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setStartKeyBase64(String startKeyBase64) {
+ this.startKeyBase64 = startKeyBase64;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setEndKeyBase64(String endKeyBase64) {
+ this.endKeyBase64 = endKeyBase64;
+ return this;
+ }
+
+ public PullServerRequestURIBuilder setLastInclude(boolean last) {
+ this.last = last;
+ return this;
+ }
+ }
+
+ public static boolean useExternalPullServerService(TajoConf conf) {
+ // TODO: add more service types like mesos
+ return TajoPullServerService.isStandalone()
+ || conf.getBoolVar(ConfVars.YARN_SHUFFLE_SERVICE_ENABLED);
+ }
+
+ private static FileChunkMeta searchFileChunkMeta(String queryId,
+ String ebSeqId,
+ String taskId,
+ Path outDir,
+ String startKey,
+ String endKey,
+ boolean last,
+ LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+ int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+ SearchResult result = searchCorrespondPart(queryId, ebSeqId, outDir, startKey, endKey, last,
+ indexReaderCache, lowCacheHitCheckThreshold);
+ // Do not send file chunks of 0 length
+ if (result != null) {
+ long startOffset = result.startOffset;
+ long endOffset = result.endOffset;
+
+ FileChunkMeta chunk = new FileChunkMeta(startOffset, endOffset - startOffset, ebSeqId, taskId);
+
+ if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+ return chunk;
+ } else {
+ return null;
+ }
+ }
+
+ private static FileChunk searchFileChunk(String queryId,
+ String ebSeqId,
+ Path outDir,
+ String startKey,
+ String endKey,
+ boolean last,
+ LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+ int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+
+ final SearchResult result = searchCorrespondPart(queryId, ebSeqId, outDir, startKey, endKey, last,
+ indexReaderCache, lowCacheHitCheckThreshold);
+ if (result != null) {
+ long startOffset = result.startOffset;
+ long endOffset = result.endOffset;
+ FileChunk chunk = new FileChunk(result.data, startOffset, endOffset - startOffset);
+
+ if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+ return chunk;
+ } else {
+ return null;
+ }
+ }
+
+ private static class SearchResult {
+ File data;
+ long startOffset;
+ long endOffset;
+
+ public SearchResult(File data, long startOffset, long endOffset) {
+ this.data = data;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+ }
+
+ private static SearchResult searchCorrespondPart(String queryId,
+ String ebSeqId,
+ Path outDir,
+ String startKey,
+ String endKey,
+ boolean last,
+ LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+ int lowCacheHitCheckThreshold) throws IOException, ExecutionException {
+ BSTIndexReader idxReader = indexReaderCache.get(new IndexCacheKey(outDir, queryId, ebSeqId));
+ idxReader.retain();
+
+ File data;
+ long startOffset;
+ long endOffset;
+ try {
+ if (LOG.isDebugEnabled()) {
+ if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate() < 0.5) {
+ LOG.debug("Too low cache hit rate: " + indexReaderCache.stats());
+ }
+ }
+
+ Tuple indexedFirst = idxReader.getFirstKey();
+ Tuple indexedLast = idxReader.getLastKey();
+
+ if (indexedFirst == null && indexedLast == null) { // if # of rows is zero
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("There is no contents");
+ }
+ return null;
+ }
+
+ byte[] startBytes = Base64.decodeBase64(startKey);
+ byte[] endBytes = Base64.decodeBase64(endKey);
+
+
+ Tuple start;
+ Tuple end;
+ Schema keySchema = idxReader.getKeySchema();
+ RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+
+ try {
+ start = decoder.toTuple(startBytes);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException("StartKey: " + startKey
+ + ", decoded byte size: " + startBytes.length, t);
+ }
+
+ try {
+ end = decoder.toTuple(endBytes);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException("EndKey: " + endKey
+ + ", decoded byte size: " + endBytes.length, t);
}
- posixFadviseIfPossible = MethodUtils.getAccessibleMethod(posixClass, "posixFadviseIfPossible", parameters);
- } catch (Throwable e) {
- loaded = false;
- LOG.warn("Failed to access posixFadviseIfPossible :" + e.getMessage(), e);
+
+ data = new File(URI.create(outDir.toUri() + "/output"));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
+ (last ? ", last=true" : "") + ")");
+ }
+
+ TupleComparator comparator = idxReader.getComparator();
+
+ if (comparator.compare(end, indexedFirst) < 0 ||
+ comparator.compare(indexedLast, start) < 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast +
+ "], but request start:" + start + ", end: " + end);
+ }
+ return null;
+ }
+
+ try {
+ idxReader.init();
+ startOffset = idxReader.find(start);
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end + ")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ try {
+ endOffset = idxReader.find(end);
+ if (endOffset == -1) {
+ endOffset = idxReader.find(end, true);
+ }
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end + ")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+
+ // if startOffset == -1 then case 2-1 or case 3
+ if (startOffset == -1) { // this is a hack
+ // if case 2-1 or case 3
+ try {
+ startOffset = idxReader.find(start, true);
+ } catch (IOException ioe) {
+ LOG.error("State Dump (the requested range: "
+ + "[" + start + ", " + end + ")" + ", idx min: "
+ + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ throw ioe;
+ }
+ }
+
+ if (startOffset == -1) {
+ throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
+ "State Dump (the requested range: "
+ + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
+ + idxReader.getLastKey());
+ }
+
+ // if greater than indexed values
+ if (last || (endOffset == -1
+ && comparator.compare(idxReader.getLastKey(), end) < 0)) {
+ endOffset = data.length();
+ }
+ } finally {
+ idxReader.release();
}
- if (posixFadviseIfPossible == null) {
- loaded = false;
+ return new SearchResult(data, startOffset, endOffset);
+ }
+
+ /**
+ * Retrieve meta information of file chunks which correspond to the requested URI.
+ * Only meta information for the file chunks which has non-zero length are retrieved.
+ *
+ * @param conf
+ * @param lDirAlloc
+ * @param localFS
+ * @param params
+ * @param gson
+ * @param indexReaderCache
+ * @param lowCacheHitCheckThreshold
+ * @return
+ * @throws IOException
+ * @throws ExecutionException
+ */
+ public static List<String> getJsonMeta(final TajoConf conf,
+ final LocalDirAllocator lDirAlloc,
+ final FileSystem localFS,
+ final PullServerParams params,
+ final Gson gson,
+ final LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+ final int lowCacheHitCheckThreshold)
+ throws IOException, ExecutionException {
+ final List<String> taskIds = PullServerUtil.splitMaps(params.taskAttemptIds());
+ final Path queryBaseDir = PullServerUtil.getBaseOutputDir(params.queryId(), params.ebId());
+ final List<String> jsonMetas = new ArrayList<>();
+
+ for (String eachTaskId : taskIds) {
+ Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+ if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+ LOG.warn("Range shuffle - file not exist. " + outputPath);
+ continue;
+ }
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
+ FileChunkMeta meta;
+ meta = PullServerUtil.searchFileChunkMeta(params.queryId(), params.ebId(), eachTaskId, path,
+ params.startKey(), params.endKey(), params.last(), indexReaderCache, lowCacheHitCheckThreshold);
+ if (meta != null && meta.getLength() > 0) {
+ String jsonStr = gson.toJson(meta, FileChunkMeta.class);
+ jsonMetas.add(jsonStr);
+ }
+ }
+ return jsonMetas;
+ }
+
+ /**
+ * Retrieve file chunks which correspond to the requested URI.
+ * Only the file chunks which has non-zero length are retrieved.
+ *
+ * @param conf
+ * @param lDirAlloc
+ * @param localFS
+ * @param params
+ * @param indexReaderCache
+ * @param lowCacheHitCheckThreshold
+ * @return
+ * @throws IOException
+ * @throws ExecutionException
+ */
+ public static List<FileChunk> getFileChunks(final TajoConf conf,
+ final LocalDirAllocator lDirAlloc,
+ final FileSystem localFS,
+ final PullServerParams params,
+ final LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
+ final int lowCacheHitCheckThreshold)
+ throws IOException, ExecutionException {
+ final List<FileChunk> chunks = new ArrayList<>();
+
+ final String queryId = params.queryId();
+ final String shuffleType = params.shuffleType();
+ final String sid = params.ebId();
+
+ final long offset = params.offset();
+ final long length = params.length();
+
+ final Path queryBaseDir = PullServerUtil.getBaseOutputDir(queryId, sid);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid);
+
+ // the working dir of tajo worker for each query
+ LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+ }
+
+ // if a stage requires a range shuffle
+ if (PullServerUtil.isRangeShuffle(shuffleType)) {
+ final List<String> taskIdList = params.taskAttemptIds();
+ final List<String> taskIds = PullServerUtil.splitMaps(taskIdList);
+
+ final String startKey = params.startKey();
+ final String endKey = params.endKey();
+ final boolean last = params.last();
+
+ long before = System.currentTimeMillis();
+ for (String eachTaskId : taskIds) {
+ Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+ if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+ LOG.warn(outputPath + " does not exist.");
+ continue;
+ }
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
+
+ FileChunk chunk = PullServerUtil.searchFileChunk(queryId, sid, path, startKey, endKey, last, indexReaderCache,
+ lowCacheHitCheckThreshold);
+ if (chunk != null) {
+ chunks.add(chunk);
+ }
+ }
+ long after = System.currentTimeMillis();
+ LOG.info("Index lookup time: " + (after - before) + " ms");
+
+ // if a stage requires a hash shuffle or a scattered hash shuffle
+ } else if (PullServerUtil.isHashShuffle(shuffleType)) {
+
+ final String partId = params.partId();
+ int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
+ Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
+ if (!lDirAlloc.ifExists(partPath.toString(), conf)) {
+ throw new FileNotFoundException(partPath.toString());
+ }
+
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf));
+
+ File file = new File(path.toUri());
+ long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+ long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+ if (startPos >= file.length()) {
+ String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
+ throw new EOFException(errorMessage);
+ }
+ FileChunk chunk = new FileChunk(file, startPos, readLen);
+ chunks.add(chunk);
+ } else {
+ throw new IllegalArgumentException(shuffleType);
}
- return loaded;
+ return chunks.stream().filter(c -> c.length() > 0).collect(Collectors.toList());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
index 4609712..069e660 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
@@ -54,11 +54,6 @@ public class TajoPullServer extends CompositeService {
start();
}
- public void start() {
- super.start();
-
- }
-
public static void main(String[] args) throws Exception {
StringUtils.startupShutdownMessage(TajoPullServerService.PullServer.class, args, LOG);
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index cbeba52..aa16f87 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -22,8 +22,9 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
-import com.google.common.collect.Lists;
+import com.google.gson.Gson;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
@@ -31,12 +32,13 @@ import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
+import io.netty.handler.codec.http.HttpHeaders.Names;
+import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
-import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,42 +58,34 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.exception.InvalidURLException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.IndexCacheKey;
import org.apache.tajo.rpc.NettyUtils;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
-import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.TajoIdUtils;
import java.io.*;
import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class TajoPullServerService extends AbstractService {
private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
- public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
- public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
- public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
- public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
private int port;
private ServerBootstrap selector;
private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@@ -99,7 +93,6 @@ public class TajoPullServerService extends AbstractService {
private int sslFileBufferSize;
private int maxUrlLength;
- private ApplicationId appId;
private FileSystem localFS;
/**
@@ -110,62 +103,14 @@ public class TajoPullServerService extends AbstractService {
private int readaheadLength;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
- public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
-
- private static final Map<String,String> userRsrc =
- new ConcurrentHashMap<>();
- private String userName;
-
- private static LoadingCache<CacheKey, BSTIndexReader> indexReaderCache = null;
- private static int lowCacheHitCheckThreshold;
-
- public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
- "tajo.pullserver.ssl.file.buffer.size";
-
- public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+ private LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache = null;
+ private int lowCacheHitCheckThreshold;
private static final boolean STANDALONE;
- private static final AtomicIntegerFieldUpdater<ProcessingStatus> SLOW_FILE_UPDATER;
- private static final AtomicIntegerFieldUpdater<ProcessingStatus> REMAIN_FILE_UPDATER;
-
- public static final String CHUNK_LENGTH_HEADER_NAME = "c";
-
- static class CacheKey {
- private Path path;
- private String queryId;
- private String ebSeqId;
-
- public CacheKey(Path path, String queryId, String ebSeqId) {
- this.path = path;
- this.queryId = queryId;
- this.ebSeqId = ebSeqId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof CacheKey) {
- CacheKey other = (CacheKey) o;
- return Objects.equals(this.path, other.path)
- && Objects.equals(this.queryId, other.queryId)
- && Objects.equals(this.ebSeqId, other.ebSeqId);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(path, queryId, ebSeqId);
- }
- }
-
static {
- /* AtomicIntegerFieldUpdater can save the memory usage instead of AtomicInteger instance */
- SLOW_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "numSlowFile");
- REMAIN_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "remainFiles");
-
- String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
- STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase("true");
+ String standalone = System.getenv(PullServerConstants.PULLSERVER_STANDALONE_ENV_KEY);
+ STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase(Boolean.TRUE.toString());
}
@Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
@@ -193,7 +138,7 @@ public class TajoPullServerService extends AbstractService {
final ShuffleMetrics metrics;
TajoPullServerService(MetricsSystem ms) {
- super("httpshuffle");
+ super(PullServerConstants.PULLSERVER_SERVICE_NAME);
metrics = ms.register(new ShuffleMetrics());
}
@@ -202,58 +147,33 @@ public class TajoPullServerService extends AbstractService {
this(DefaultMetricsSystem.instance());
}
- public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
- // TODO these bytes should be versioned
- // TODO: Once SHuffle is out of NM, this can use MR APIs
- this.appId = appId;
- this.userName = user;
- userRsrc.put(appId.toString(), user);
- }
-
- public void stopApp(ApplicationId appId) {
- userRsrc.remove(appId.toString());
- }
-
+ // TODO change AbstractService to throw InterruptedException
@Override
- public void init(Configuration conf) {
- try {
- manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
- DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
- readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
- DEFAULT_SHUFFLE_READAHEAD_BYTES);
+ public void serviceInit(Configuration conf) throws Exception {
+ if (!(conf instanceof TajoConf)) {
+ throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+ }
+ TajoConf tajoConf = (TajoConf) conf;
- int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
- Runtime.getRuntime().availableProcessors() * 2);
+ manageOsCache = tajoConf.getBoolean(PullServerConstants.SHUFFLE_MANAGE_OS_CACHE,
+ PullServerConstants.DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
- selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum)
- .option(ChannelOption.TCP_NODELAY, true)
- .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
- .childOption(ChannelOption.TCP_NODELAY, true);
+ readaheadLength = tajoConf.getInt(PullServerConstants.SHUFFLE_READAHEAD_BYTES,
+ PullServerConstants.DEFAULT_SHUFFLE_READAHEAD_BYTES);
- localFS = new LocalFileSystem();
+ int workerNum = tajoConf.getIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM);
- maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
- ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
+ selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+ .childOption(ChannelOption.TCP_NODELAY, true);
- conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
- , conf.getInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal));
- super.init(conf);
- LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
- } catch (Throwable t) {
- LOG.error(t, t);
- }
- }
+ localFS = new LocalFileSystem();
- // TODO change AbstractService to throw InterruptedException
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- if (!(conf instanceof TajoConf)) {
- throw new IllegalArgumentException("Configuration must be a TajoConf instance");
- }
+ maxUrlLength = tajoConf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+ LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
ServerBootstrap bootstrap = selector.clone();
- TajoConf tajoConf = (TajoConf)conf;
try {
channelInitializer = new HttpChannelInitializer(tajoConf);
} catch (Exception ex) {
@@ -263,7 +183,7 @@ public class TajoPullServerService extends AbstractService {
.channel(NioServerSocketChannel.class);
port = tajoConf.getIntVar(ConfVars.PULLSERVER_PORT);
- ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
+ ChannelFuture future = bootstrap.bind(port)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
.syncUninterruptibly();
@@ -272,8 +192,8 @@ public class TajoPullServerService extends AbstractService {
tajoConf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
LOG.info(getName() + " listening on port " + port);
- sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
- DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+ sslFileBufferSize = conf.getInt(PullServerConstants.SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+ PullServerConstants.DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
@@ -283,10 +203,10 @@ public class TajoPullServerService extends AbstractService {
.expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
.removalListener(removalListener)
.build(
- new CacheLoader<CacheKey, BSTIndexReader>() {
+ new CacheLoader<IndexCacheKey, BSTIndexReader>() {
@Override
- public BSTIndexReader load(CacheKey key) throws Exception {
- return new BSTIndex(tajoConf).getIndexReader(new Path(key.path, "index"));
+ public BSTIndexReader load(IndexCacheKey key) throws Exception {
+ return new BSTIndex(tajoConf).getIndexReader(new Path(key.getPath(), "index"));
}
}
);
@@ -353,29 +273,31 @@ public class TajoPullServerService extends AbstractService {
}
@Override
- public void stop() {
- try {
- accepted.close();
- if (selector != null) {
- if (selector.group() != null) {
- selector.group().shutdownGracefully();
- }
- if (selector.childGroup() != null) {
- selector.childGroup().shutdownGracefully();
- }
+ public void serviceStop() throws Exception {
+ accepted.close();
+ if (selector != null) {
+ if (selector.group() != null) {
+ selector.group().shutdownGracefully();
}
-
- if (channelInitializer != null) {
- channelInitializer.destroy();
+ if (selector.childGroup() != null) {
+ selector.childGroup().shutdownGracefully();
}
+ }
- localFS.close();
- indexReaderCache.invalidateAll();
- } catch (Throwable t) {
- LOG.error(t, t);
- } finally {
- super.stop();
+ if (channelInitializer != null) {
+ channelInitializer.destroy();
}
+
+ localFS.close();
+ indexReaderCache.invalidateAll();
+
+ super.serviceStop();
+ }
+
+ public List<FileChunk> getFileChunks(TajoConf conf, LocalDirAllocator lDirAlloc, PullServerParams params)
+ throws IOException, ExecutionException {
+ return PullServerUtil.getFileChunks(conf, lDirAlloc, localFS, params, indexReaderCache,
+ lowCacheHitCheckThreshold);
}
class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
@@ -385,8 +307,7 @@ public class TajoPullServerService extends AbstractService {
public HttpChannelInitializer(TajoConf conf) throws Exception {
PullServer = new PullServer(conf);
- if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
- ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
+ if (conf.getBoolVar(ConfVars.SHUFFLE_SSL_ENABLED_KEY)) {
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
sslFactory.init();
}
@@ -417,72 +338,13 @@ public class TajoPullServerService extends AbstractService {
}
}
-
- Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<>();
-
- public void completeFileChunk(FileRegion filePart,
- String requestUri,
- long startTime) {
- ProcessingStatus status = processingStatusMap.get(requestUri);
- if (status != null) {
- status.decrementRemainFiles(filePart, startTime);
- }
- }
-
- class ProcessingStatus {
- String requestUri;
- int numFiles;
- long startTime;
- long makeFileListTime;
- long minTime = Long.MAX_VALUE;
- long maxTime;
- volatile int numSlowFile;
- volatile int remainFiles;
-
- public ProcessingStatus(String requestUri) {
- this.requestUri = requestUri;
- this.startTime = System.currentTimeMillis();
- }
-
- public void setNumFiles(int numFiles) {
- this.numFiles = numFiles;
- this.remainFiles = numFiles;
- }
-
- public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
- long fileSendTime = System.currentTimeMillis() - fileStartTime;
-
- if (fileSendTime > maxTime) {
- maxTime = fileSendTime;
- }
- if (fileSendTime < minTime) {
- minTime = fileSendTime;
- }
-
- if (fileSendTime > 20 * 1000) {
- LOG.warn("Sending data takes too long. " + fileSendTime + "ms elapsed, " +
- "length:" + (filePart.count() - filePart.position()) + ", URI:" + requestUri);
- SLOW_FILE_UPDATER.compareAndSet(this, numSlowFile, numSlowFile + 1);
- }
-
- REMAIN_FILE_UPDATER.compareAndSet(this, remainFiles, remainFiles - 1);
- if (REMAIN_FILE_UPDATER.get(this) <= 0) {
- processingStatusMap.remove(requestUri);
- if(LOG.isDebugEnabled()) {
- LOG.debug("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, "
- + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, "
- + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
- }
- }
- }
- }
-
@ChannelHandler.Sharable
class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
private final TajoConf conf;
private final LocalDirAllocator lDirAlloc =
new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+ private final Gson gson = new Gson();
public PullServer(TajoConf conf) throws IOException {
this.conf = conf;
@@ -512,7 +374,7 @@ public class TajoPullServerService extends AbstractService {
}
if (request.getMethod() == HttpMethod.DELETE) {
- HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NO_CONTENT);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
clearIndexCache(request.getUri());
@@ -523,110 +385,117 @@ public class TajoPullServerService extends AbstractService {
}
// Parsing the URL into key-values
- Map<String, List<String>> params = null;
try {
- params = decodeParams(request.getUri());
+ final PullServerParams params = new PullServerParams(request.getUri());
+ if (PullServerUtil.isChunkRequest(params.requestType())) {
+ handleChunkRequest(ctx, request, params);
+ } else {
+ handleMetaRequest(ctx, request, params);
+ }
} catch (Throwable e) {
- LOG.error("Failed to decode uri " + request.getUri());
+ LOG.error("Failed to handle request " + request.getUri());
sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
return;
}
+ }
- ProcessingStatus processingStatus = new ProcessingStatus(request.getUri());
- processingStatusMap.put(request.getUri(), processingStatus);
-
- String partId = params.get("p").get(0);
- String queryId = params.get("qid").get(0);
- String shuffleType = params.get("type").get(0);
- String sid = params.get("sid").get(0);
-
- final List<String> taskIdList = params.get("ta");
- final List<String> offsetList = params.get("offset");
- final List<String> lengthList = params.get("length");
-
- long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
- long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
- List<String> taskIds = splitMaps(taskIdList);
-
- Path queryBaseDir = getBaseOutputDir(queryId, sid);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
- + ", taskIds=" + taskIdList);
-
- // the working dir of tajo worker for each query
- LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+ /**
+ * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
+ * It is called whenever an execution block is completed.
+ *
+ * @param uri query URI which indicates the execution block id
+ * @throws IOException
+ * @throws InvalidURLException
+ */
+ public void clearIndexCache(String uri)
+ throws IOException, InvalidURLException {
+ // Simply parse the given uri
+ String[] tokens = uri.split("=");
+ if (tokens.length != 2 || !tokens[0].equals("ebid")) {
+ throw new IllegalArgumentException("invalid params: " + uri);
}
-
- final List<FileChunk> chunks = Lists.newArrayList();
-
- // if a stage requires a range shuffle
- if (shuffleType.equals("r")) {
- final String startKey = params.get("start").get(0);
- final String endKey = params.get("end").get(0);
- final boolean last = params.get("final") != null;
-
- long before = System.currentTimeMillis();
- for (String eachTaskId : taskIds) {
- Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
- if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
- LOG.warn(outputPath + "does not exist.");
- continue;
- }
- Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
-
- FileChunk chunk;
- try {
- chunk = getFileChunks(queryId, sid, path, startKey, endKey, last);
- } catch (Throwable t) {
- LOG.error("ERROR Request: " + request.getUri(), t);
- sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
- return;
+ ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
+ String queryId = ebId.getQueryId().toString();
+ String ebSeqId = Integer.toString(ebId.getId());
+ List<IndexCacheKey> removed = new ArrayList<>();
+ synchronized (indexReaderCache) {
+ for (Entry<IndexCacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
+ IndexCacheKey key = e.getKey();
+ if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+ e.getValue().forceClose();
+ removed.add(e.getKey());
}
- if (chunk != null) {
- chunks.add(chunk);
+ }
+ indexReaderCache.invalidateAll(removed);
+ }
+ removed.clear();
+ synchronized (waitForRemove) {
+ for (Entry<IndexCacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
+ IndexCacheKey key = e.getKey();
+ if (key.getQueryId().equals(queryId) && key.getEbSeqId().equals(ebSeqId)) {
+ e.getValue().forceClose();
+ removed.add(e.getKey());
}
}
- long after = System.currentTimeMillis();
- LOG.info("Index lookup time: " + (after - before) + " ms");
-
- // if a stage requires a hash shuffle or a scattered hash shuffle
- } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
- int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf);
- Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
- if (!lDirAlloc.ifExists(partPath.toString(), conf)) {
- LOG.warn("Partition shuffle file not exists: " + partPath);
- sendError(ctx, HttpResponseStatus.NO_CONTENT);
- return;
+ for (IndexCacheKey eachKey : removed) {
+ waitForRemove.remove(eachKey);
}
+ }
+ }
+
+ private void handleMetaRequest(ChannelHandlerContext ctx, FullHttpRequest request, final PullServerParams params)
+ throws IOException, ExecutionException {
+ final List<String> jsonMetas;
+ try {
+ jsonMetas = PullServerUtil.getJsonMeta(conf, lDirAlloc, localFS, params, gson, indexReaderCache,
+ lowCacheHitCheckThreshold);
+ } catch (FileNotFoundException e) {
+ sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+ return;
+ } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+ sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+ return;
+ } catch (ExecutionException e) {
+ // There are some problems in index cache
+ throw new TajoInternalError(e.getCause());
+ }
- Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf));
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.OK,
+ Unpooled.copiedBuffer(gson.toJson(jsonMetas), CharsetUtil.UTF_8));
+ response.headers().set(Names.CONTENT_TYPE, "application/json; charset=UTF-8");
+ HttpHeaders.setContentLength(response, response.content().readableBytes());
+ if (HttpHeaders.isKeepAlive(request)) {
+ response.headers().set(Names.CONNECTION, Values.KEEP_ALIVE);
+ }
+ ChannelFuture writeFuture = ctx.writeAndFlush(response);
- File file = new File(path.toUri());
- long startPos = (offset >= 0 && length >= 0) ? offset : 0;
- long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+ // Decide whether to close the connection or not.
+ if (!HttpHeaders.isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ writeFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
- if (startPos >= file.length()) {
- String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
- LOG.error(errorMessage);
- sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST);
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
- }
- FileChunk chunk = new FileChunk(file, startPos, readLen);
- chunks.add(chunk);
- } else {
- LOG.error("Unknown shuffle type: " + shuffleType);
- sendError(ctx, "Unknown shuffle type:" + shuffleType, HttpResponseStatus.BAD_REQUEST);
+ private void handleChunkRequest(ChannelHandlerContext ctx, FullHttpRequest request, final PullServerParams params)
+ throws IOException {
+ final List<FileChunk> chunks;
+ try {
+ chunks = PullServerUtil.getFileChunks(conf, lDirAlloc, localFS, params, indexReaderCache,
+ lowCacheHitCheckThreshold);
+ } catch (FileNotFoundException e) {
+ sendError(ctx, e.getMessage(), HttpResponseStatus.NO_CONTENT);
+ return;
+ } catch (IOException | IllegalArgumentException e) { // IOException, EOFException, IllegalArgumentException
+ sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
return;
+ } catch (ExecutionException e) {
+ // There are some problems in index cache
+ throw new TajoInternalError(e.getCause());
}
// Write the content.
if (chunks.size() == 0) {
- HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.NO_CONTENT);
if (!HttpHeaders.isKeepAlive(request)) {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
@@ -637,7 +506,7 @@ public class TajoPullServerService extends AbstractService {
} else {
FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
ChannelFuture writeFuture = null;
- HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
long totalSize = 0;
StringBuilder sb = new StringBuilder();
for (FileChunk chunk : file) {
@@ -645,7 +514,7 @@ public class TajoPullServerService extends AbstractService {
sb.append(Long.toString(chunk.length())).append(",");
}
sb.deleteCharAt(sb.length() - 1);
- HttpHeaders.addHeader(response, CHUNK_LENGTH_HEADER_NAME, sb.toString());
+ HttpHeaders.addHeader(response, PullServerConstants.CHUNK_LENGTH_HEADER_NAME, sb.toString());
HttpHeaders.setContentLength(response, totalSize);
if (HttpHeaders.isKeepAlive(request)) {
@@ -655,7 +524,7 @@ public class TajoPullServerService extends AbstractService {
writeFuture = ctx.write(response);
for (FileChunk chunk : file) {
- writeFuture = sendFile(ctx, chunk, request.getUri());
+ writeFuture = sendFile(ctx, chunk);
if (writeFuture == null) {
sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
@@ -676,53 +545,8 @@ public class TajoPullServerService extends AbstractService {
}
}
- /**
- * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
- * It is called whenever an execution block is completed.
- *
- * @param uri query URI which indicates the execution block id
- * @throws IOException
- * @throws InvalidURLException
- */
- private void clearIndexCache(String uri) throws IOException, InvalidURLException {
- // Simply parse the given uri
- String[] tokens = uri.split("=");
- if (tokens.length != 2 || !tokens[0].equals("ebid")) {
- throw new IllegalArgumentException("invalid params: " + uri);
- }
- ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
- String queryId = ebId.getQueryId().toString();
- String ebSeqId = Integer.toString(ebId.getId());
- List<CacheKey> removed = new ArrayList<>();
- synchronized (indexReaderCache) {
- for (Entry<CacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
- CacheKey key = e.getKey();
- if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
- e.getValue().forceClose();
- removed.add(e.getKey());
- }
- }
- indexReaderCache.invalidateAll(removed);
- }
- removed.clear();
- synchronized (waitForRemove) {
- for (Entry<CacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
- CacheKey key = e.getKey();
- if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
- e.getValue().forceClose();
- removed.add(e.getKey());
- }
- }
- for (CacheKey eachKey : removed) {
- waitForRemove.remove(eachKey);
- }
- }
- }
-
private ChannelFuture sendFile(ChannelHandlerContext ctx,
- FileChunk file,
- String requestUri) throws IOException {
- long startTime = System.currentTimeMillis();
+ FileChunk file) throws IOException {
RandomAccessFile spill = null;
ChannelFuture writeFuture;
try {
@@ -732,7 +556,7 @@ public class TajoPullServerService extends AbstractService {
file.startOffset(), file.length(), manageOsCache, readaheadLength,
readaheadPool, file.getFile().getAbsolutePath());
writeFuture = ctx.write(filePart);
- writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
+ writeFuture.addListener(new FileCloseListener(filePart));
} else {
// HTTPS cannot be done with zero copy.
final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
@@ -763,9 +587,10 @@ public class TajoPullServerService extends AbstractService {
private void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
- Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
+ ByteBuf content = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, content);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
+ HttpHeaders.setContentLength(response, content.writerIndex());
// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
@@ -782,12 +607,12 @@ public class TajoPullServerService extends AbstractService {
}
// Temporal space to wait for the completion of all index lookup operations
- private static final ConcurrentHashMap<CacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<IndexCacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
// RemovalListener is triggered when an item is removed from the index reader cache.
// It closes index readers when they are not used anymore.
// If they are still being used, they are moved to waitForRemove map to wait for other operations' completion.
- private static final RemovalListener<CacheKey, BSTIndexReader> removalListener = (removal) -> {
+ private final RemovalListener<IndexCacheKey, BSTIndexReader> removalListener = (removal) -> {
BSTIndexReader reader = removal.getValue();
if (reader.getReferenceNum() == 0) {
try {
@@ -800,180 +625,4 @@ public class TajoPullServerService extends AbstractService {
waitForRemove.put(removal.getKey(), reader);
}
};
-
- public static FileChunk getFileChunks(String queryId,
- String ebSeqId,
- Path outDir,
- String startKey,
- String endKey,
- boolean last) throws IOException, ExecutionException {
-
- BSTIndexReader idxReader = indexReaderCache.get(new CacheKey(outDir, queryId, ebSeqId));
- idxReader.retain();
-
- File data;
- long startOffset;
- long endOffset;
- try {
- if (LOG.isDebugEnabled()) {
- if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate() < 0.5) {
- LOG.debug("Too low cache hit rate: " + indexReaderCache.stats());
- }
- }
-
- Tuple indexedFirst = idxReader.getFirstKey();
- Tuple indexedLast = idxReader.getLastKey();
-
- if (indexedFirst == null && indexedLast == null) { // if # of rows is zero
- if (LOG.isDebugEnabled()) {
- LOG.debug("There is no contents");
- }
- return null;
- }
-
- byte[] startBytes = Base64.decodeBase64(startKey);
- byte[] endBytes = Base64.decodeBase64(endKey);
-
-
- Tuple start;
- Tuple end;
- Schema keySchema = idxReader.getKeySchema();
- RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
-
- try {
- start = decoder.toTuple(startBytes);
- } catch (Throwable t) {
- throw new IllegalArgumentException("StartKey: " + startKey
- + ", decoded byte size: " + startBytes.length, t);
- }
-
- try {
- end = decoder.toTuple(endBytes);
- } catch (Throwable t) {
- throw new IllegalArgumentException("EndKey: " + endKey
- + ", decoded byte size: " + endBytes.length, t);
- }
-
- data = new File(URI.create(outDir.toUri() + "/output"));
- if (LOG.isDebugEnabled()) {
- LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
- (last ? ", last=true" : "") + ")");
- }
-
- TupleComparator comparator = idxReader.getComparator();
-
- if (comparator.compare(end, indexedFirst) < 0 ||
- comparator.compare(indexedLast, start) < 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast +
- "], but request start:" + start + ", end: " + end);
- }
- return null;
- }
-
- try {
- idxReader.init();
- startOffset = idxReader.find(start);
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end + ")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- try {
- endOffset = idxReader.find(end);
- if (endOffset == -1) {
- endOffset = idxReader.find(end, true);
- }
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end + ")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
-
- // if startOffset == -1 then case 2-1 or case 3
- if (startOffset == -1) { // this is a hack
- // if case 2-1 or case 3
- try {
- startOffset = idxReader.find(start, true);
- } catch (IOException ioe) {
- LOG.error("State Dump (the requested range: "
- + "[" + start + ", " + end + ")" + ", idx min: "
- + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- throw ioe;
- }
- }
-
- if (startOffset == -1) {
- throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
- "State Dump (the requested range: "
- + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
- + idxReader.getLastKey());
- }
-
- // if greater than indexed values
- if (last || (endOffset == -1
- && comparator.compare(idxReader.getLastKey(), end) < 0)) {
- endOffset = data.length();
- }
- } finally {
- idxReader.release();
- }
-
- FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
-
- if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
- return chunk;
- }
-
- public static List<String> splitMaps(List<String> mapq) {
- if (null == mapq) {
- return null;
- }
- final List<String> ret = new ArrayList<>();
- for (String s : mapq) {
- Collections.addAll(ret, s.split(","));
- }
- return ret;
- }
-
- public static Map<String, List<String>> decodeParams(String uri) {
- final Map<String, List<String>> params = new QueryStringDecoder(uri).parameters();
- final List<String> types = params.get("type");
- final List<String> qids = params.get("qid");
- final List<String> ebIds = params.get("sid");
- final List<String> partIds = params.get("p");
-
- if (types == null || ebIds == null || qids == null || partIds == null) {
- throw new IllegalArgumentException("invalid params. required :" + params);
- }
-
- if (qids.size() != 1 && types.size() != 1 || ebIds.size() != 1) {
- throw new IllegalArgumentException("invalid params. required :" + params);
- }
-
- return params;
- }
-
- public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) {
- Path workDir =
- StorageUtil.concatPath(
- queryId,
- "output",
- executionBlockSequenceId);
- return workDir;
- }
-
- public static Path getBaseInputDir(String queryId, String executionBlockId) {
- Path workDir =
- StorageUtil.concatPath(
- queryId,
- "in",
- executionBlockId);
- return workDir;
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
index 67cff21..c5f6a6a 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
@@ -36,7 +36,7 @@ public class FileChunk {
*/
private String ebId;
- public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
+ public FileChunk(File file, long startOffset, long length) {
this.file = file;
this.startOffset = startOffset;
this.length = length;
@@ -76,6 +76,6 @@ public class FileChunk {
public String toString() {
return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") "
- + file.getAbsolutePath();
+ + file.getAbsolutePath();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
new file mode 100644
index 0000000..3f6b3eb
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunkMeta.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+public class FileChunkMeta {
+ private final long startOffset;
+ private final long length;
+ private final String ebId;
+ private final String taskId;
+
+ public FileChunkMeta(long startOffset, long length, String ebId, String taskId) {
+ this.startOffset = startOffset;
+ this.length = length;
+ this.ebId = ebId;
+ this.taskId = taskId;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public String getEbId() {
+ return ebId;
+ }
+
+ public String toString() {
+ return "ebId: " + ebId + ", taskId: " + taskId + " (" + startOffset + ", " + length + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
new file mode 100644
index 0000000..2a71c65
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/IndexCacheKey.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver.retriever;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Objects;
+
+public class IndexCacheKey {
+ private Path path;
+ private String queryId;
+ private String ebSeqId;
+
+ public IndexCacheKey(Path path, String queryId, String ebSeqId) {
+ this.path = path;
+ this.queryId = queryId;
+ this.ebSeqId = ebSeqId;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public String getEbSeqId() {
+ return ebSeqId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof IndexCacheKey) {
+ IndexCacheKey other = (IndexCacheKey) o;
+ return Objects.equals(this.path, other.path)
+ && Objects.equals(this.queryId, other.queryId)
+ && Objects.equals(this.ebSeqId, other.ebSeqId);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path, queryId, ebSeqId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-yarn/pom.xml b/tajo-yarn/pom.xml
new file mode 100644
index 0000000..70511a1
--- /dev/null
+++ b/tajo-yarn/pom.xml
@@ -0,0 +1,265 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>tajo-project</artifactId>
+ <groupId>org.apache.tajo</groupId>
+ <version>0.12.0-SNAPSHOT</version>
+ <relativePath>../tajo-project</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>tajo-yarn</artifactId>
+ <packaging>jar</packaging>
+ <name>Tajo Yarn</name>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.iq80.snappy</groupId>
+ <artifactId>snappy</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>net.minidev</groupId>
+ <artifactId>json-smart</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-plan</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-storage-hdfs</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-storage-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-orc</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop-bundle</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>trevni-avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>trevni-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-pullserver</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-rpc-protobuf</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <profiles>
+ <profile>
+ <id>docs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <executions>
+ <execution>
+ <!-- build javadoc jars per jar for publishing to maven -->
+ <id>module-javadocs</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <destDir>${project.build.directory}</destDir>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>src</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <executions>
+ <execution>
+ <!-- builds source jars and attaches them to the project for publishing -->
+ <id>tajo-java-sources</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </reporting>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/73ac4b87/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
new file mode 100644
index 0000000..3c0a76f
--- /dev/null
+++ b/tajo-yarn/src/main/java/org/apache/tajo/yarn/FadvisedChunkedFile.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.yarn;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+ private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+ private final boolean manageOsCache;
+ private final int readaheadLength;
+ private final ReadaheadPool readaheadPool;
+ private final FileDescriptor fd;
+ private final String identifier;
+
+ private ReadaheadPool.ReadaheadRequest readaheadRequest;
+
+ public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+ int chunkSize, boolean manageOsCache, int readaheadLength,
+ ReadaheadPool readaheadPool, String identifier) throws IOException {
+ super(file, position, count, chunkSize);
+ this.manageOsCache = manageOsCache;
+ this.readaheadLength = readaheadLength;
+ this.readaheadPool = readaheadPool;
+ this.fd = file.getFD();
+ this.identifier = identifier;
+ }
+
+ @Override
+ public Object nextChunk() throws Exception {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
+ readaheadRequest = readaheadPool
+ .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+ getEndOffset(), readaheadRequest);
+ }
+ return super.nextChunk();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
+ try {
+ PullServerUtil.posixFadviseIfPossible(identifier,
+ fd,
+ getStartOffset(), getEndOffset() - getStartOffset(),
+ NativeIO.POSIX.POSIX_FADV_DONTNEED);
+ } catch (Throwable t) {
+ LOG.warn("Failed to manage OS cache for " + identifier, t);
+ }
+ }
+ super.close();
+ }
+}