You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/12/24 10:18:24 UTC
[1/2] tajo git commit: TAJO-1950: Query master uses too much memory
during range shuffle.
Repository: tajo
Updated Branches:
refs/heads/master e8ee7f2bf -> 1f9ae1da0
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 74805ce..6d9639c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -25,17 +25,19 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.engine.query.TaskRequest;
@@ -53,6 +55,7 @@ import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
@@ -92,11 +95,13 @@ public class TaskImpl implements Task {
private long endTime;
private List<FileChunk> localChunks;
+ private List<FileChunk> remoteChunks;
// TODO - to be refactored
private ShuffleType shuffleType = null;
private Schema finalSchema = null;
private TupleComparator sortComp = null;
+ private final int maxUrlLength;
public TaskImpl(final TaskRequest request,
final ExecutionBlockContext executionBlockContext) throws IOException {
@@ -122,6 +127,7 @@ public class TaskImpl implements Task {
this.context.setDataChannel(request.getDataChannel());
this.context.setEnforcer(request.getEnforcer());
this.context.setState(TaskAttemptState.TA_PENDING);
+ this.maxUrlLength = systemConf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
}
public void initPlan() throws IOException {
@@ -148,14 +154,15 @@ public class TaskImpl implements Task {
}
this.localChunks = Collections.synchronizedList(new ArrayList<>());
+ this.remoteChunks = Collections.synchronizedList(new ArrayList<>());
LOG.info(String.format("* Task %s is initialized. InterQuery: %b, Shuffle: %s, Fragments: %d, Fetches:%d, " +
"Local dir: %s", request.getId(), interQuery, shuffleType, request.getFragments().size(),
request.getFetches().size(), taskDir));
if(LOG.isDebugEnabled()) {
- for (FetchImpl f : request.getFetches()) {
- LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
+ for (FetchProto f : request.getFetches()) {
+ LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + Repartitioner.createSimpleURIs(maxUrlLength, f));
}
}
@@ -250,6 +257,18 @@ public class TaskImpl implements Task {
@Override
public void fetch(ExecutorService fetcherExecutor) {
+ // Sort the execution order of fetch runners to increase the cache hit in pull server
+ fetcherRunners.sort((f1, f2) -> {
+ String strUri = f1.getURI().toString();
+ int index = strUri.lastIndexOf("&ta");
+ String taskIdStr1 = strUri.substring(index + "&ta".length());
+
+ strUri = f2.getURI().toString();
+ index = strUri.lastIndexOf("&ta");
+ String taskIdStr2 = strUri.substring(index + "&ta".length());
+ return taskIdStr1.compareTo(taskIdStr2);
+ });
+
for (Fetcher f : fetcherRunners) {
fetcherExecutor.submit(new FetchRunner(context, f));
}
@@ -375,8 +394,7 @@ public class TaskImpl implements Task {
if (broadcastTableNames.contains(inputTable)) {
continue;
}
- File tableDir = new File(context.getFetchIn(), inputTable);
- FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
+ FileFragment[] frags = localizeFetchedData(inputTable);
context.updateAssignedFragments(inputTable, frags);
}
}
@@ -540,24 +558,21 @@ public class TaskImpl implements Task {
return false;
}
- private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
+ private FileFragment[] localizeFetchedData(String name)
throws IOException {
Configuration c = new Configuration(systemConf);
c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
FileSystem fs = FileSystem.get(c);
- Path tablePath = new Path(file.getAbsolutePath());
List<FileFragment> listTablets = new ArrayList<>();
FileFragment tablet;
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus f : fileLists) {
- if (f.getLen() == 0) {
- continue;
+ for (FileChunk chunk : remoteChunks) {
+ if (name.equals(chunk.getEbId())) {
+ tablet = new FileFragment(name, fs.makeQualified(new Path(chunk.getFile().getPath())), chunk.startOffset(), chunk.length());
+ listTablets.add(tablet);
}
- tablet = new FileFragment(name, fs.makeQualified(f.getPath()), 0l, f.getLen());
- listTablets.add(tablet);
}
// Special treatment for locally pseudo fetched chunks
@@ -604,11 +619,16 @@ public class TaskImpl implements Task {
LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
}
try {
- FileChunk fetched = fetcher.get();
- if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
- && fetched.getFile() != null) {
- if (fetched.fromRemote() == false) {
- localChunks.add(fetched);
+ List<FileChunk> fetched = fetcher.get();
+ if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) {
+ for (FileChunk eachFetch : fetched) {
+ if (eachFetch.getFile() != null) {
+ if (!eachFetch.fromRemote()) {
+ localChunks.add(eachFetch);
+ } else {
+ remoteChunks.add(eachFetch);
+ }
+ }
}
break;
}
@@ -658,7 +678,7 @@ public class TaskImpl implements Task {
}
private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
- List<FetchImpl> fetches) throws IOException {
+ List<FetchProto> fetches) throws IOException {
if (fetches.size() > 0) {
Path inputDir = executionBlockContext.getLocalDirAllocator().
@@ -668,50 +688,59 @@ public class TaskImpl implements Task {
int localStoreChunkCount = 0;
File storeDir;
File defaultStoreFile;
- FileChunk storeChunk = null;
+ List<FileChunk> storeChunkList = new ArrayList<>();
List<Fetcher> runnerList = Lists.newArrayList();
- for (FetchImpl f : fetches) {
+ for (FetchProto f : fetches) {
storeDir = new File(inputDir.toString(), f.getName());
if (!storeDir.exists()) {
if (!storeDir.mkdirs()) throw new IOException("Failed to create " + storeDir);
}
- for (URI uri : f.getURIs()) {
+ for (URI uri : Repartitioner.createFullURIs(maxUrlLength, f)) {
+ storeChunkList.clear();
defaultStoreFile = new File(storeDir, "in_" + i);
InetAddress address = InetAddress.getByName(uri.getHost());
WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
- storeChunk = getLocalStoredFileChunk(uri, systemConf);
+ List<FileChunk> localChunkCandidates = getLocalStoredFileChunk(uri, systemConf);
- // When a range request is out of range, storeChunk will be NULL. This case is normal state.
- // So, we should skip and don't need to create storeChunk.
- if (storeChunk == null || storeChunk.length() == 0) {
- continue;
- }
+ for (FileChunk localChunk : localChunkCandidates) {
+ // When a range request is out of range, storeChunk will be NULL. This case is normal state.
+ // So, we should skip and don't need to create storeChunk.
+ if (localChunk == null || localChunk.length() == 0) {
+ continue;
+ }
- if (storeChunk.getFile() != null && storeChunk.startOffset() > -1) {
- storeChunk.setFromRemote(false);
- localStoreChunkCount++;
- } else {
- storeChunk = new FileChunk(defaultStoreFile, 0, -1);
- storeChunk.setFromRemote(true);
+ if (localChunk.getFile() != null && localChunk.startOffset() > -1) {
+ localChunk.setFromRemote(false);
+ localStoreChunkCount++;
+ } else {
+ localChunk = new FileChunk(defaultStoreFile, 0, -1);
+ localChunk.setFromRemote(true);
+ }
+ localChunk.setEbId(f.getName());
+ storeChunkList.add(localChunk);
}
+
} else {
- storeChunk = new FileChunk(defaultStoreFile, 0, -1);
- storeChunk.setFromRemote(true);
+ FileChunk remoteChunk = new FileChunk(defaultStoreFile, 0, -1);
+ remoteChunk.setFromRemote(true);
+ remoteChunk.setEbId(f.getName());
+ storeChunkList.add(remoteChunk);
}
// If we decide that intermediate data should be really fetched from a remote host, storeChunk
// represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
- storeChunk.setEbId(f.getName());
- Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
- runnerList.add(fetcher);
- i++;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Create a new Fetcher with storeChunk:" + storeChunk.toString());
+ for (FileChunk eachChunk : storeChunkList) {
+ Fetcher fetcher = new Fetcher(systemConf, uri, eachChunk);
+ runnerList.add(fetcher);
+ i++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Create a new Fetcher with storeChunk:" + eachChunk.toString());
+ }
}
}
}
@@ -724,7 +753,7 @@ public class TaskImpl implements Task {
}
}
- private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
+ private List<FileChunk> getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
// Parse the URI
// Parsing the URL into key-values
@@ -749,28 +778,37 @@ public class TaskImpl implements Task {
// The working directory of Tajo worker for each query, including stage
Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid);
- List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
- FileChunk chunk;
+ List<FileChunk> chunkList = new ArrayList<>();
// If the stage requires a range shuffle
if (shuffleType.equals("r")) {
- Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
- if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) {
- LOG.warn("Range shuffle - file not exist. " + outputPath);
- return null;
+ final String startKey = params.get("start").get(0);
+ final String endKey = params.get("end").get(0);
+ final boolean last = params.get("final") != null;
+ final List<String> taskIds = TajoPullServerService.splitMaps(taskIdList);
+
+ long before = System.currentTimeMillis();
+ for (String eachTaskId : taskIds) {
+ Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+ if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) {
+ LOG.warn("Range shuffle - file not exist. " + outputPath);
+ continue;
+ }
+ Path path = executionBlockContext.getLocalFS().makeQualified(
+ executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
+
+ try {
+ FileChunk chunk = TajoPullServerService.getFileChunks(queryId, sid, path, startKey, endKey, last);
+ chunkList.add(chunk);
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ throw new IOException(t.getCause());
+ }
}
- Path path = executionBlockContext.getLocalFS().makeQualified(
- executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf));
- String startKey = params.get("start").get(0);
- String endKey = params.get("end").get(0);
- boolean last = params.get("final") != null;
-
- try {
- chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- return null;
+ long after = System.currentTimeMillis();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Index lookup time: " + (after - before) + " ms");
}
// If the stage requires a hash shuffle or a scattered hash shuffle
@@ -779,8 +817,7 @@ public class TaskImpl implements Task {
Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId);
if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf)) {
- LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
- return null;
+ throw new IOException("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
}
Path path = executionBlockContext.getLocalFS().makeQualified(
executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(), conf));
@@ -789,17 +826,16 @@ public class TaskImpl implements Task {
long readLen = (offset >= 0 && length >= 0) ? length : file.length();
if (startPos >= file.length()) {
- LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
- return null;
+ throw new IOException("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
}
- chunk = new FileChunk(file, startPos, readLen);
+ FileChunk chunk = new FileChunk(file, startPos, readLen);
+ chunkList.add(chunk);
} else {
- LOG.error("Unknown shuffle type");
- return null;
+ throw new IOException("Unknown shuffle type");
}
- return chunk;
+ return chunkList;
}
public static Path getTaskAttemptDir(TaskAttemptId quid) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/proto/ResourceProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto
index 3b9a85e..3643a97 100644
--- a/tajo-core/src/main/proto/ResourceProtos.proto
+++ b/tajo-core/src/main/proto/ResourceProtos.proto
@@ -79,15 +79,17 @@ message FetchProto {
required ExecutionBlockIdProto execution_block_id = 4;
required int32 partition_id = 5;
required string name = 6;
- optional string range_params = 7;
- optional bool has_next = 8 [default = false];
+ optional bytes range_start = 7;
+ optional bytes range_end = 8;
+ optional bool range_last_inclusive = 9;
+ optional bool has_next = 10 [default = false];
// repeated part
- repeated int32 task_id = 9 [packed = true];
- repeated int32 attempt_id = 10 [packed = true];
+ repeated int32 task_id = 11 [packed = true];
+ repeated int32 attempt_id = 12 [packed = true];
- optional int64 offset = 11;
- optional int64 length = 12;
+ optional int64 offset = 13;
+ optional int64 length = 14;
}
message TaskStatusProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/resources/webapps/worker/task.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/task.jsp b/tajo-core/src/main/resources/webapps/worker/task.jsp
index f2f903b..3e17c8a 100644
--- a/tajo-core/src/main/resources/webapps/worker/task.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/task.jsp
@@ -21,26 +21,24 @@
<%@ page import="org.apache.tajo.ExecutionBlockId" %>
<%@ page import="org.apache.tajo.QueryId" %>
+<%@ page import="org.apache.tajo.ResourceProtos.FetchProto" %>
<%@ page import="org.apache.tajo.ResourceProtos.ShuffleFileOutput" %>
<%@ page import="org.apache.tajo.TaskId" %>
<%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %>
<%@ page import="org.apache.tajo.catalog.statistics.TableStats" %>
-<%@ page import="org.apache.tajo.querymaster.Query" %>
-<%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %>
-<%@ page import="org.apache.tajo.querymaster.Stage" %>
-<%@ page import="org.apache.tajo.querymaster.Task" %>
+<%@ page import="org.apache.tajo.querymaster.*" %>
<%@ page import="org.apache.tajo.storage.DataLocation" %>
<%@ page import="org.apache.tajo.storage.fragment.Fragment" %>
<%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.FetchImpl" %>
<%@ page import="org.apache.tajo.worker.TajoWorker" %>
<%@ page import="java.net.URI" %>
<%@ page import="java.text.SimpleDateFormat" %>
<%@ page import="java.util.Map" %>
<%@ page import="java.util.Set" %>
+<%@ page import="org.apache.tajo.conf.TajoConf.ConfVars" %>
<%
String paramQueryId = request.getParameter("queryId");
@@ -64,6 +62,9 @@
return;
}
+ int maxUrlLength = tajoWorker.getConfig().getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
+ ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
+
Query query = queryMasterTask.getQuery();
Stage stage = query.getStage(ebid);
@@ -110,11 +111,11 @@
String fetchInfo = "";
delim = "";
- for (Map.Entry<String, Set<FetchImpl>> e : task.getFetchMap().entrySet()) {
+ for (Map.Entry<String, Set<FetchProto>> e : task.getFetchMap().entrySet()) {
fetchInfo += delim + "<b>" + e.getKey() + "</b>";
delim = "<br/>";
- for (FetchImpl f : e.getValue()) {
- for (URI uri : f.getSimpleURIs()){
+ for (FetchProto f : e.getValue()) {
+ for (URI uri : Repartitioner.createSimpleURIs(maxUrlLength, f)){
fetchInfo += delim + uri;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
index 68e8093..15cfdf4 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.exception.ExceptionUtil;
import org.apache.tajo.pullserver.retriever.DataRetriever;
import org.apache.tajo.pullserver.retriever.FileChunk;
@@ -84,7 +85,7 @@ public class HttpDataServerHandler extends SimpleChannelInboundHandler<FullHttpR
FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
// Write the content.
- if (file == null) {
+ if (file.length == 0) {
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
if (!HttpHeaders.isKeepAlive(request)) {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
@@ -171,7 +172,8 @@ public class HttpDataServerHandler extends SimpleChannelInboundHandler<FullHttpR
return;
}
- LOG.error(cause.getMessage(), cause);
+ LOG.error(cause.getMessage());
+ ExceptionUtil.printStackTraceIfError(LOG, cause);
if (ch.isActive()) {
sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index ef3d7e0..4a7f2da 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -18,6 +18,10 @@
package org.apache.tajo.pullserver;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
import com.google.common.collect.Lists;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
@@ -53,24 +57,29 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.InvalidURLException;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.NettyUtils;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.TajoIdUtils;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public class TajoPullServerService extends AbstractService {
@@ -88,6 +97,7 @@ public class TajoPullServerService extends AbstractService {
private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private HttpChannelInitializer channelInitializer;
private int sslFileBufferSize;
+ private int maxUrlLength;
private ApplicationId appId;
private FileSystem localFS;
@@ -100,32 +110,62 @@ public class TajoPullServerService extends AbstractService {
private int readaheadLength;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
-
public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
private static final Map<String,String> userRsrc =
new ConcurrentHashMap<>();
private String userName;
+ private static LoadingCache<CacheKey, BSTIndexReader> indexReaderCache = null;
+ private static int lowCacheHitCheckThreshold;
+
public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
"tajo.pullserver.ssl.file.buffer.size";
public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
- private static boolean STANDALONE = false;
+ private static final boolean STANDALONE;
private static final AtomicIntegerFieldUpdater<ProcessingStatus> SLOW_FILE_UPDATER;
private static final AtomicIntegerFieldUpdater<ProcessingStatus> REMAIN_FILE_UPDATER;
+ public static final String CHUNK_LENGTH_HEADER_NAME = "c";
+
+ static class CacheKey {
+ private Path path;
+ private String queryId;
+ private String ebSeqId;
+
+ public CacheKey(Path path, String queryId, String ebSeqId) {
+ this.path = path;
+ this.queryId = queryId;
+ this.ebSeqId = ebSeqId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof CacheKey) {
+ CacheKey other = (CacheKey) o;
+ return Objects.equals(this.path, other.path)
+ && Objects.equals(this.queryId, other.queryId)
+ && Objects.equals(this.ebSeqId, other.ebSeqId);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path, queryId, ebSeqId);
+ }
+ }
+
static {
/* AtomicIntegerFieldUpdater can save the memory usage instead of AtomicInteger instance */
SLOW_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "numSlowFile");
REMAIN_FILE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ProcessingStatus.class, "remainFiles");
String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE");
- if (!StringUtils.isEmpty(standalone)) {
- STANDALONE = standalone.equalsIgnoreCase("true");
- }
+ STANDALONE = !StringUtils.isEmpty(standalone) && standalone.equalsIgnoreCase("true");
}
@Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
@@ -193,6 +233,9 @@ public class TajoPullServerService extends AbstractService {
localFS = new LocalFileSystem();
+ maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
+ ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
+
conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
, conf.getInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal));
super.init(conf);
@@ -219,20 +262,35 @@ public class TajoPullServerService extends AbstractService {
bootstrap.childHandler(channelInitializer)
.channel(NioServerSocketChannel.class);
- port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
- ConfVars.PULLSERVER_PORT.defaultIntVal);
+ port = tajoConf.getIntVar(ConfVars.PULLSERVER_PORT);
ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
.syncUninterruptibly();
accepted.add(future.channel());
port = ((InetSocketAddress)future.channel().localAddress()).getPort();
- conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
+ tajoConf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
LOG.info(getName() + " listening on port " + port);
sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+ int cacheSize = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_SIZE);
+ int cacheTimeout = tajoConf.getIntVar(ConfVars.PULLSERVER_CACHE_TIMEOUT);
+
+ indexReaderCache = CacheBuilder.newBuilder()
+ .maximumSize(cacheSize)
+ .expireAfterWrite(cacheTimeout, TimeUnit.MINUTES)
+ .removalListener(removalListener)
+ .build(
+ new CacheLoader<CacheKey, BSTIndexReader>() {
+ @Override
+ public BSTIndexReader load(CacheKey key) throws Exception {
+ return new BSTIndex(tajoConf).getIndexReader(new Path(key.path, "index"));
+ }
+ }
+ );
+ lowCacheHitCheckThreshold = (int) (cacheSize * 0.1f);
if (STANDALONE) {
File pullServerPortFile = getPullServerPortFile();
@@ -312,6 +370,7 @@ public class TajoPullServerService extends AbstractService {
}
localFS.close();
+ indexReaderCache.invalidateAll();
} catch (Throwable t) {
LOG.error(t, t);
} finally {
@@ -348,7 +407,7 @@ public class TajoPullServerService extends AbstractService {
int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
- pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize));
+ pipeline.addLast("codec", new HttpServerCodec(maxUrlLength, 8192, maxChunkSize));
pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
pipeline.addLast("chunking", new ChunkedWriteHandler());
pipeline.addLast("shuffle", PullServer);
@@ -422,7 +481,6 @@ public class TajoPullServerService extends AbstractService {
class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
private final TajoConf conf;
-// private final IndexCache indexCache;
private final LocalDirAllocator lDirAlloc =
new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
@@ -447,22 +505,36 @@ public class TajoPullServerService extends AbstractService {
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
throws Exception {
- if (request.getMethod() != HttpMethod.GET) {
- sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
+ if (request.getDecoderResult().isFailure()) {
+ LOG.error("Http decoding failed. ", request.getDecoderResult().cause());
+ sendError(ctx, request.getDecoderResult().toString(), HttpResponseStatus.BAD_REQUEST);
return;
}
- ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
- processingStatusMap.put(request.getUri().toString(), processingStatus);
+ if (request.getMethod() == HttpMethod.DELETE) {
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+
+ clearIndexCache(request.getUri());
+ return;
+ } else if (request.getMethod() != HttpMethod.GET) {
+ sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
+ return;
+ }
// Parsing the URL into key-values
Map<String, List<String>> params = null;
try {
params = decodeParams(request.getUri());
} catch (Throwable e) {
+ LOG.error("Failed to decode uri " + request.getUri());
sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST);
+ return;
}
+ ProcessingStatus processingStatus = new ProcessingStatus(request.getUri());
+ processingStatusMap.put(request.getUri(), processingStatus);
+
String partId = params.get("p").get(0);
String queryId = params.get("qid").get(0);
String shuffleType = params.get("type").get(0);
@@ -491,28 +563,33 @@ public class TajoPullServerService extends AbstractService {
// if a stage requires a range shuffle
if (shuffleType.equals("r")) {
- Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output");
- if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
- LOG.warn(outputPath + "does not exist.");
- sendError(ctx, HttpResponseStatus.NO_CONTENT);
- return;
- }
- Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
- String startKey = params.get("start").get(0);
- String endKey = params.get("end").get(0);
- boolean last = params.get("final") != null;
-
- FileChunk chunk;
- try {
- chunk = getFileChunks(path, startKey, endKey, last);
- } catch (Throwable t) {
- LOG.error("ERROR Request: " + request.getUri(), t);
- sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
- return;
- }
- if (chunk != null) {
- chunks.add(chunk);
+ final String startKey = params.get("start").get(0);
+ final String endKey = params.get("end").get(0);
+ final boolean last = params.get("final") != null;
+
+ long before = System.currentTimeMillis();
+ for (String eachTaskId : taskIds) {
+ Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
+ if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
+ LOG.warn(outputPath + "does not exist.");
+ continue;
+ }
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
+
+ FileChunk chunk;
+ try {
+ chunk = getFileChunks(queryId, sid, path, startKey, endKey, last);
+ } catch (Throwable t) {
+ LOG.error("ERROR Request: " + request.getUri(), t);
+ sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ if (chunk != null) {
+ chunks.add(chunk);
+ }
}
+ long after = System.currentTimeMillis();
+ LOG.info("Index lookup time: " + (after - before) + " ms");
// if a stage requires a hash shuffle or a scattered hash shuffle
} else if (shuffleType.equals("h") || shuffleType.equals("s")) {
@@ -536,7 +613,9 @@ public class TajoPullServerService extends AbstractService {
sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST);
return;
}
- LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
+ }
FileChunk chunk = new FileChunk(file, startPos, readLen);
chunks.add(chunk);
} else {
@@ -545,8 +624,6 @@ public class TajoPullServerService extends AbstractService {
return;
}
- processingStatus.setNumFiles(chunks.size());
- processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime;
// Write the content.
if (chunks.size() == 0) {
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
@@ -562,9 +639,13 @@ public class TajoPullServerService extends AbstractService {
ChannelFuture writeFuture = null;
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
long totalSize = 0;
+ StringBuilder sb = new StringBuilder();
for (FileChunk chunk : file) {
totalSize += chunk.length();
+ sb.append(Long.toString(chunk.length())).append(",");
}
+ sb.deleteCharAt(sb.length() - 1);
+ HttpHeaders.addHeader(response, CHUNK_LENGTH_HEADER_NAME, sb.toString());
HttpHeaders.setContentLength(response, totalSize);
if (HttpHeaders.isKeepAlive(request)) {
@@ -580,6 +661,7 @@ public class TajoPullServerService extends AbstractService {
return;
}
}
+
if (ctx.pipeline().get(SslHandler.class) == null) {
writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} else {
@@ -594,6 +676,49 @@ public class TajoPullServerService extends AbstractService {
}
}
+ /**
+ * Upon a request from TajoWorker, this method clears index cache for fetching data of an execution block.
+ * It is called whenever an execution block is completed.
+ *
+ * @param uri query URI which indicates the execution block id
+ * @throws IOException
+ * @throws InvalidURLException
+ */
+ private void clearIndexCache(String uri) throws IOException, InvalidURLException {
+ // Simply parse the given uri
+ String[] tokens = uri.split("=");
+ if (tokens.length != 2 || !tokens[0].equals("ebid")) {
+ throw new IllegalArgumentException("invalid params: " + uri);
+ }
+ ExecutionBlockId ebId = TajoIdUtils.createExecutionBlockId(tokens[1]);
+ String queryId = ebId.getQueryId().toString();
+ String ebSeqId = Integer.toString(ebId.getId());
+ List<CacheKey> removed = new ArrayList<>();
+ synchronized (indexReaderCache) {
+ for (Entry<CacheKey, BSTIndexReader> e : indexReaderCache.asMap().entrySet()) {
+ CacheKey key = e.getKey();
+ if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
+ e.getValue().forceClose();
+ removed.add(e.getKey());
+ }
+ }
+ indexReaderCache.invalidateAll(removed);
+ }
+ removed.clear();
+ synchronized (waitForRemove) {
+ for (Entry<CacheKey, BSTIndexReader> e : waitForRemove.entrySet()) {
+ CacheKey key = e.getKey();
+ if (key.queryId.equals(queryId) && key.ebSeqId.equals(ebSeqId)) {
+ e.getValue().forceClose();
+ removed.add(e.getKey());
+ }
+ }
+ for (CacheKey eachKey : removed) {
+ waitForRemove.remove(eachKey);
+ }
+ }
+ }
+
private ChannelFuture sendFile(ChannelHandlerContext ctx,
FileChunk file,
String requestUri) throws IOException {
@@ -617,7 +742,7 @@ public class TajoPullServerService extends AbstractService {
writeFuture = ctx.write(new HttpChunkedInput(chunk));
}
} catch (FileNotFoundException e) {
- LOG.info(file.getFile() + " not found");
+ LOG.fatal(file.getFile() + " not found");
return null;
} catch (Throwable e) {
if (spill != null) {
@@ -656,26 +781,65 @@ public class TajoPullServerService extends AbstractService {
}
}
- public static FileChunk getFileChunks(Path outDir,
+ // Temporal space to wait for the completion of all index lookup operations
+ private static final ConcurrentHashMap<CacheKey, BSTIndexReader> waitForRemove = new ConcurrentHashMap<>();
+
+ // RemovalListener is triggered when an item is removed from the index reader cache.
+ // It closes index readers when they are not used anymore.
+ // If they are still being used, they are moved to waitForRemove map to wait for other operations' completion.
+ private static final RemovalListener<CacheKey, BSTIndexReader> removalListener = (removal) -> {
+ BSTIndexReader reader = removal.getValue();
+ if (reader.getReferenceNum() == 0) {
+ try {
+ reader.close(); // tear down properly
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ waitForRemove.remove(removal.getKey());
+ } else {
+ waitForRemove.put(removal.getKey(), reader);
+ }
+ };
+
+ public static FileChunk getFileChunks(String queryId,
+ String ebSeqId,
+ Path outDir,
String startKey,
String endKey,
- boolean last) throws IOException {
- BSTIndex index = new BSTIndex(new TajoConf());
- try (BSTIndex.BSTIndexReader idxReader = index.getIndexReader(new Path(outDir, "index"))) {
- Schema keySchema = idxReader.getKeySchema();
- TupleComparator comparator = idxReader.getComparator();
+ boolean last) throws IOException, ExecutionException {
+
+ BSTIndexReader idxReader = indexReaderCache.get(new CacheKey(outDir, queryId, ebSeqId));
+ idxReader.retain();
+ File data;
+ long startOffset;
+ long endOffset;
+ try {
if (LOG.isDebugEnabled()) {
- LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")");
+ if (indexReaderCache.size() > lowCacheHitCheckThreshold && indexReaderCache.stats().hitRate() < 0.5) {
+ LOG.debug("Too low cache hit rate: " + indexReaderCache.stats());
+ }
+ }
+
+ Tuple indexedFirst = idxReader.getFirstKey();
+ Tuple indexedLast = idxReader.getLastKey();
+
+ if (indexedFirst == null && indexedLast == null) { // if # of rows is zero
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("There is no contents");
+ }
+ return null;
}
- File data = new File(URI.create(outDir.toUri() + "/output"));
byte[] startBytes = Base64.decodeBase64(startKey);
byte[] endBytes = Base64.decodeBase64(endKey);
- RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+
Tuple start;
Tuple end;
+ Schema keySchema = idxReader.getKeySchema();
+ RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
+
try {
start = decoder.toTuple(startBytes);
} catch (Throwable t) {
@@ -690,23 +854,23 @@ public class TajoPullServerService extends AbstractService {
+ ", decoded byte size: " + endBytes.length, t);
}
- LOG.info("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
- (last ? ", last=true" : "") + ")");
-
- if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
- LOG.info("There is no contents");
- return null;
+ data = new File(URI.create(outDir.toUri() + "/output"));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end +
+ (last ? ", last=true" : "") + ")");
}
- if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
- comparator.compare(idxReader.getLastKey(), start) < 0) {
- LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
- "], but request start:" + start + ", end: " + end);
+ TupleComparator comparator = idxReader.getComparator();
+
+ if (comparator.compare(end, indexedFirst) < 0 ||
+ comparator.compare(indexedLast, start) < 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Out of Scope (indexed data [" + indexedFirst + ", " + indexedLast +
+ "], but request start:" + start + ", end: " + end);
+ }
return null;
}
- long startOffset;
- long endOffset;
try {
idxReader.init();
startOffset = idxReader.find(start);
@@ -756,12 +920,14 @@ public class TajoPullServerService extends AbstractService {
&& comparator.compare(idxReader.getLastKey(), end) < 0)) {
endOffset = data.length();
}
+ } finally {
+ idxReader.release();
+ }
- FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
+ FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
- if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
- return chunk;
- }
+ if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk);
+ return chunk;
}
public static List<String> splitMaps(List<String> mapq) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
index edb1179..e0051f4 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -42,6 +42,8 @@ import java.nio.channels.FileChannel;
import java.util.LinkedList;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
@@ -440,6 +442,9 @@ public class BSTIndex implements IndexMethod {
}
}
+ private static final AtomicIntegerFieldUpdater<BSTIndexReader> REFERENCE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(BSTIndexReader.class, "referenceNum");
+
/**
* BSTIndexReader is thread-safe.
*/
@@ -468,6 +473,10 @@ public class BSTIndex implements IndexMethod {
private RowStoreDecoder rowStoreDecoder;
+ private AtomicBoolean inited = new AtomicBoolean(false);
+
+ volatile int referenceNum;
+
/**
*
* @param fileName
@@ -488,6 +497,25 @@ public class BSTIndex implements IndexMethod {
open();
}
+ /**
+ * Increase the reference number of the index reader.
+ */
+ public void retain() {
+ REFERENCE_UPDATER.compareAndSet(this, referenceNum, referenceNum + 1);
+ }
+
+ /**
+ * Decrease the reference number of the index reader.
+ * This method must be called before {@link #close()}.
+ */
+ public void release() {
+ REFERENCE_UPDATER.compareAndSet(this, referenceNum, referenceNum - 1);
+ }
+
+ public int getReferenceNum() {
+ return referenceNum;
+ }
+
public Schema getKeySchema() {
return this.keySchema;
}
@@ -543,8 +571,10 @@ public class BSTIndex implements IndexMethod {
byteBuf.release();
}
- public void init() throws IOException {
- fillData();
+ public synchronized void init() throws IOException {
+ if (inited.compareAndSet(false, true)) {
+ fillData();
+ }
}
private void open()
@@ -684,6 +714,8 @@ public class BSTIndex implements IndexMethod {
} catch (IOException e) {
//TODO this block should fix correctly
counter--;
+ if (counter == 0)
+ LOG.info("counter: " + counter);
if (pos != -1) {
in.seek(pos);
}
@@ -765,6 +797,9 @@ public class BSTIndex implements IndexMethod {
//http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541
int centerPos = (start + end) >>> 1;
+ if (arr.length == 0) {
+ LOG.error("arr.length: 0, loadNum: " + loadNum + ", inited: " + inited.get());
+ }
while (true) {
if (comparator.compare(arr[centerPos], key) > 0) {
if (centerPos == 0) {
@@ -800,8 +835,23 @@ public class BSTIndex implements IndexMethod {
return offset;
}
+ /**
+ * Close index reader only when it is not used anymore.
+ */
@Override
public void close() throws IOException {
+ if (referenceNum == 0) {
+ this.indexIn.close();
+ }
+ }
+
+ /**
+ * Close index reader even though it is being used.
+ *
+ * @throws IOException
+ */
+ public void forceClose() throws IOException {
+ REFERENCE_UPDATER.compareAndSet(this, referenceNum, 0);
this.indexIn.close();
}
[2/2] tajo git commit: TAJO-1950: Query master uses too much memory
during range shuffle.
Posted by ji...@apache.org.
TAJO-1950: Query master uses too much memory during range shuffle.
Closes #884
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1f9ae1da
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1f9ae1da
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1f9ae1da
Branch: refs/heads/master
Commit: 1f9ae1da0424731567cea18e975c47d4479b0ae9
Parents: e8ee7f2
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Dec 24 18:17:56 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Dec 24 18:17:56 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 4 +
.../apache/tajo/ha/TestHAServiceHDFSImpl.java | 4 +-
.../apache/tajo/master/TestRepartitioner.java | 81 ++---
.../org/apache/tajo/worker/TestFetcher.java | 2 +-
.../apache/tajo/engine/query/TaskRequest.java | 6 +-
.../tajo/engine/query/TaskRequestImpl.java | 13 +-
.../org/apache/tajo/engine/utils/TupleUtil.java | 32 --
.../tajo/querymaster/DefaultTaskScheduler.java | 13 +-
.../tajo/querymaster/FetchScheduleEvent.java | 7 +-
.../apache/tajo/querymaster/Repartitioner.java | 206 ++++++++-----
.../java/org/apache/tajo/querymaster/Stage.java | 3 +-
.../java/org/apache/tajo/querymaster/Task.java | 48 ++-
.../tajo/worker/ExecutionBlockContext.java | 68 ++++-
.../java/org/apache/tajo/worker/FetchImpl.java | 140 ++++-----
.../java/org/apache/tajo/worker/Fetcher.java | 45 ++-
.../java/org/apache/tajo/worker/TaskImpl.java | 174 ++++++-----
tajo-core/src/main/proto/ResourceProtos.proto | 14 +-
.../src/main/resources/webapps/worker/task.jsp | 17 +-
.../tajo/pullserver/HttpDataServerHandler.java | 6 +-
.../tajo/pullserver/TajoPullServerService.java | 302 ++++++++++++++-----
.../apache/tajo/storage/index/bst/BSTIndex.java | 54 +++-
22 files changed, 811 insertions(+), 430 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2edf9d7..c02bfc7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
IMPROVEMENT
+ TAJO-1950: Query master uses too much memory during range shuffle. (jihoon)
+
TAJO-1858: Aligning error message in execute query page of web UI is needed.
(Byunghwa Yun via jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 9ab3dfa..29cf9ee 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -212,6 +212,10 @@ public class TajoConf extends Configuration {
// Shuffle Configuration --------------------------------------------------
PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
+ PULLSERVER_CACHE_SIZE("tajo.pullserver.index-cache.size", 10000, Validators.min("1")),
+ PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")),
+ PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB,
+ Validators.min("1")),
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, Validators.javaString()),
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
index 279fce7..81eeb1f 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
@@ -88,7 +88,9 @@ public class TestHAServiceHDFSImpl {
assertEquals(2, fs.listStatus(activePath).length);
assertEquals(0, fs.listStatus(backupPath).length);
} finally {
- backupMaster.stop();
+ if (backupMaster != null) {
+ backupMaster.stop();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index b0a3a17..c260ab6 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -29,13 +29,15 @@ import org.apache.tajo.TestTajoIds;
import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.querymaster.Task.PullHost;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.FetchImpl;
import org.junit.Test;
import java.net.URI;
import java.util.*;
+import java.util.stream.Collectors;
import static junit.framework.Assert.assertEquals;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
@@ -70,11 +72,9 @@ public class TestRepartitioner {
new HashMap<>();
for (Map.Entry<Integer, List<IntermediateEntry>> eachEntry: intermediateEntries.entrySet()) {
- FetchImpl fetch = new FetchImpl(new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
+ FetchImpl fetch = new FetchImpl(sid.toString(), new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
sid, eachEntry.getKey(), eachEntry.getValue());
- fetch.setName(sid.toString());
-
FetchProto proto = fetch.getProto();
fetch = new FetchImpl(proto);
assertEquals(proto, fetch.getProto());
@@ -84,7 +84,7 @@ public class TestRepartitioner {
hashEntries.put(eachEntry.getKey(), ebEntries);
- List<URI> uris = fetch.getURIs();
+ List<URI> uris = Repartitioner.createFullURIs(2 * StorageUnit.KB, fetch.getProto());
assertEquals(1, uris.size()); //In Hash Suffle, Fetcher return only one URI per partition.
URI uri = uris.get(0);
@@ -119,7 +119,7 @@ public class TestRepartitioner {
ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
FetchImpl [] fetches = new FetchImpl[12];
for (int i = 0; i < 12; i++) {
- fetches[i] = new FetchImpl(new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2);
+ fetches[i] = new FetchImpl(tableName, new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2);
}
int [] VOLUMES = {100, 80, 70, 30, 10, 5};
@@ -128,37 +128,40 @@ public class TestRepartitioner {
fetchGroups.put(i, new FetchGroupMeta(VOLUMES[i / 2], fetches[i]).addFetche(fetches[i + 1]));
}
- Pair<Long [], Map<String, List<FetchImpl>>[]> results;
+ FetchProto[] expectedProtos = new FetchProto[fetches.length];
+ expectedProtos = Arrays.stream(fetches).map(fetch -> fetch.getProto()).collect(Collectors.toList())
+ .toArray(expectedProtos);
+ Pair<Long [], Map<String, List<FetchProto>>[]> results;
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1);
long expected [] = {100 + 80 + 70 + 30 + 10 + 5};
assertFetchVolumes(expected, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2);
long expected0 [] = {140, 155};
assertFetchVolumes(expected0, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3);
long expected1 [] = {100, 95, 100};
assertFetchVolumes(expected1, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4);
long expected2 [] = {100, 80, 70, 45};
assertFetchVolumes(expected2, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5);
long expected3 [] = {100, 80, 70, 30, 15};
assertFetchVolumes(expected3, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6);
long expected4 [] = {100, 80, 70, 30, 10, 5};
assertFetchVolumes(expected4, results.getFirst());
- assertFetchImpl(fetches, results.getSecond());
+ assertFetchProto(expectedProtos, results.getSecond());
}
private static void assertFetchVolumes(long [] expected, Long [] results) {
@@ -191,7 +194,8 @@ public class TestRepartitioner {
}
long splitVolume = 128 * 1024 * 1024;
- List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
splitVolume, 10 * 1024 * 1024);
assertEquals(6, fetches.size());
@@ -199,10 +203,10 @@ public class TestRepartitioner {
int index = 0;
int numZeroPosFetcher = 0;
long totalLength = 0;
- for (List<FetchImpl> eachFetchList: fetches) {
+ for (List<FetchProto> eachFetchList: fetches) {
totalInterms += eachFetchList.size();
long eachFetchVolume = 0;
- for (FetchImpl eachFetch: eachFetchList) {
+ for (FetchProto eachFetch: eachFetchList) {
eachFetchVolume += eachFetch.getLength();
if (eachFetch.getOffset() == 0) {
numZeroPosFetcher++;
@@ -248,8 +252,9 @@ public class TestRepartitioner {
intermediateEntries.add(interm);
}
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
long splitVolume = 128 * 1024 * 1024;
- List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+ List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
splitVolume, 10 * 1024 * 1024);
assertEquals(32, fetches.size());
@@ -258,15 +263,15 @@ public class TestRepartitioner {
long totalLength = 0;
Set<String> uniqPullHost = new HashSet<>();
- for (List<FetchImpl> eachFetchList: fetches) {
+ for (List<FetchProto> eachFetchList: fetches) {
long length = 0;
- for (FetchImpl eachFetch: eachFetchList) {
+ for (FetchProto eachFetch: eachFetchList) {
if (eachFetch.getOffset() == 0) {
numZeroPosFetcher++;
}
totalLength += eachFetch.getLength();
length += eachFetch.getLength();
- uniqPullHost.add(eachFetch.getPullHost().toString());
+ uniqPullHost.add(new PullHost(eachFetch.getHost(), eachFetch.getPort()).toString());
}
assertTrue(length + " should be smaller than splitVolume", length < splitVolume);
if (index < fetches.size() - 1) {
@@ -378,7 +383,8 @@ public class TestRepartitioner {
}
long splitVolume = 256 * 1024 * 1024;
- List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, entries, splitVolume,
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, entries, splitVolume,
10 * 1024 * 1024);
@@ -393,13 +399,13 @@ public class TestRepartitioner {
{728355084,121760359},
};
int index = 0;
- for (List<FetchImpl> eachFetchList: fetches) {
+ for (List<FetchProto> eachFetchList: fetches) {
if (index == 3) {
assertEquals(2, eachFetchList.size());
} else {
assertEquals(1, eachFetchList.size());
}
- for (FetchImpl eachFetch: eachFetchList) {
+ for (FetchProto eachFetch: eachFetchList) {
assertEquals(expected[index][0], eachFetch.getOffset());
assertEquals(expected[index][1], eachFetch.getLength());
index++;
@@ -438,13 +444,14 @@ public class TestRepartitioner {
}
long splitVolume = 128 * 1024 * 1024;
- List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries,
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries,
splitVolume, 10 * 1024 * 1024);
assertEquals(32, fetches.size());
int expectedSize = 0;
- Set<FetchImpl> fetchSet = new HashSet<>();
- for(List<FetchImpl> list : fetches){
+ Set<FetchProto> fetchSet = new HashSet<>();
+ for(List<FetchProto> list : fetches){
expectedSize += list.size();
fetchSet.addAll(list);
}
@@ -456,15 +463,15 @@ public class TestRepartitioner {
long totalLength = 0;
Set<String> uniqPullHost = new HashSet<>();
- for (List<FetchImpl> eachFetchList: fetches) {
+ for (List<FetchProto> eachFetchList: fetches) {
long length = 0;
- for (FetchImpl eachFetch: eachFetchList) {
+ for (FetchProto eachFetch: eachFetchList) {
if (eachFetch.getOffset() == 0) {
numZeroPosFetcher++;
}
totalLength += eachFetch.getLength();
length += eachFetch.getLength();
- uniqPullHost.add(eachFetch.getPullHost().toString());
+ uniqPullHost.add(new PullHost(eachFetch.getHost(), eachFetch.getPort()).toString());
}
assertTrue(length + " should be smaller than splitVolume", length < splitVolume);
if (index < fetches.size() - 1) {
@@ -482,25 +489,25 @@ public class TestRepartitioner {
ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
Task.PullHost pullHost = new Task.PullHost("localhost", 0);
- FetchImpl expected = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
- FetchImpl fetch2 = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
+ FetchImpl expected = new FetchImpl("name", pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
+ FetchImpl fetch2 = new FetchImpl("name", pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1);
assertEquals(expected, fetch2);
fetch2.setOffset(5);
fetch2.setLength(10);
assertNotEquals(expected, fetch2);
}
- private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) {
- Set<FetchImpl> expectedURLs = Sets.newHashSet();
+ private static void assertFetchProto(FetchProto [] expected, Map<String, List<FetchProto>>[] result) {
+ Set<FetchProto> expectedURLs = Sets.newHashSet();
- for (FetchImpl f : expected) {
+ for (FetchProto f : expected) {
expectedURLs.add(f);
}
- Set<FetchImpl> resultURLs = Sets.newHashSet();
+ Set<FetchProto> resultURLs = Sets.newHashSet();
- for (Map<String, List<FetchImpl>> e : result) {
- for (List<FetchImpl> list : e.values()) {
+ for (Map<String, List<FetchProto>> e : result) {
+ for (List<FetchProto> list : e.values()) {
resultURLs.addAll(list);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
index a91fc30..dfc37b0 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -90,7 +90,7 @@ public class TestFetcher {
FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
storeChunk.setFromRemote(true);
final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
- FileChunk chunk = fetcher.get();
+ FileChunk chunk = fetcher.get().get(0);
assertNotNull(chunk);
assertNotNull(chunk.getFile());
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
index 48d4780..ef4ff60 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
@@ -21,6 +21,7 @@
*/
package org.apache.tajo.engine.query;
+import org.apache.tajo.ResourceProtos.FetchProto;
import org.apache.tajo.ResourceProtos.TaskRequestProto;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -29,7 +30,6 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.worker.FetchImpl;
import java.util.List;
@@ -39,8 +39,8 @@ public interface TaskRequest extends ProtoObject<TaskRequestProto> {
List<CatalogProtos.FragmentProto> getFragments();
PlanProto.LogicalNodeTree getPlan();
void setInterQuery();
- void addFetch(String name, FetchImpl fetch);
- List<FetchImpl> getFetches();
+ void addFetch(FetchProto fetch);
+ List<FetchProto> getFetches();
QueryContext getQueryContext(TajoConf conf);
DataChannel getDataChannel();
Enforcer getEnforcer();
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
index 7b52dab..fc7556c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
@@ -42,7 +42,7 @@ public class TaskRequestImpl implements TaskRequest {
private boolean clusteredOutput;
private PlanProto.LogicalNodeTree plan; // logical node
private Boolean interQuery;
- private List<FetchImpl> fetches;
+ private List<FetchProto> fetches;
private QueryContext queryContext;
private DataChannel dataChannel;
private Enforcer enforcer;
@@ -157,10 +157,10 @@ public class TaskRequestImpl implements TaskRequest {
this.interQuery = true;
}
- public void addFetch(String name, FetchImpl fetch) {
+ @Override
+ public void addFetch(FetchProto fetch) {
maybeInitBuilder();
initFetches();
- fetch.setName(name);
fetches.add(fetch);
}
@@ -212,7 +212,8 @@ public class TaskRequestImpl implements TaskRequest {
return this.enforcer;
}
- public List<FetchImpl> getFetches() {
+ @Override
+ public List<FetchProto> getFetches() {
initFetches();
return this.fetches;
@@ -225,7 +226,7 @@ public class TaskRequestImpl implements TaskRequest {
TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
this.fetches = new ArrayList<>();
for(FetchProto fetch : p.getFetchesList()) {
- fetches.add(new FetchImpl(fetch));
+ fetches.add(fetch);
}
}
@@ -259,7 +260,7 @@ public class TaskRequestImpl implements TaskRequest {
}
if (this.fetches != null) {
for (int i = 0; i < fetches.size(); i++) {
- builder.addFetches(fetches.get(i).getProto());
+ builder.addFetches(fetches.get(i));
}
}
if (this.queryMasterHostAndPort != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index bae04c8..31c23f4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.Column;
@@ -29,47 +28,16 @@ import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.VTuple;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
import java.util.List;
import java.util.Map;
public class TupleUtil {
private static final Log LOG = LogFactory.getLog(TupleUtil.class);
- public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
- throws UnsupportedEncodingException {
- return rangeToQuery(range, last, RowStoreUtil.createEncoder(schema));
- }
-
- public static String rangeToQuery(TupleRange range, boolean last, RowStoreEncoder encoder)
- throws UnsupportedEncodingException {
- StringBuilder sb = new StringBuilder();
- byte [] firstKeyBytes = encoder.toBytes(range.getStart());
- byte [] endKeyBytes = encoder.toBytes(range.getEnd());
-
- String firstKeyBase64 = new String(Base64.encodeBase64(firstKeyBytes));
- String lastKeyBase64 = new String(Base64.encodeBase64(endKeyBytes));
-
- sb.append("start=")
- .append(URLEncoder.encode(firstKeyBase64, "utf-8"))
- .append("&")
- .append("end=")
- .append(URLEncoder.encode(lastKeyBase64, "utf-8"));
-
- if (last) {
- sb.append("&final=true");
- }
-
- return sb.toString();
- }
-
/**
* if max value is null, set ranges[last]
* @param sortSpecs
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index f1c0f62..26dc103 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -52,7 +52,6 @@ import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.FetchImpl;
import java.net.InetSocketAddress;
import java.util.*;
@@ -233,11 +232,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
} else if (event instanceof FetchScheduleEvent) {
FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
- Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
+ Map<String, List<FetchProto>> fetches = castEvent.getFetches();
TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext();
Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
scheduledObjectNum++;
- for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
+ for (Entry<String, List<FetchProto>> eachFetch : fetches.entrySet()) {
task.addFetches(eachFetch.getKey(), eachFetch.getValue());
task.addFragment(fragmentsForNonLeafTask[0], true);
if (fragmentsForNonLeafTask[1] != null) {
@@ -983,11 +982,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
taskAssign.setInterQuery();
}
- for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) {
- Collection<FetchImpl> fetches = entry.getValue();
+ for(Map.Entry<String, Set<FetchProto>> entry: task.getFetchMap().entrySet()) {
+ Collection<FetchProto> fetches = entry.getValue();
if (fetches != null) {
- for (FetchImpl fetch : fetches) {
- taskAssign.addFetch(entry.getKey(), fetch);
+ for (FetchProto fetch : fetches) {
+ taskAssign.addFetch(fetch);
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
index 5fe2f80..e4e63b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
@@ -19,6 +19,7 @@
package org.apache.tajo.querymaster;
import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.FetchProto;
import org.apache.tajo.master.event.TaskSchedulerEvent;
import org.apache.tajo.worker.FetchImpl;
@@ -26,15 +27,15 @@ import java.util.List;
import java.util.Map;
public class FetchScheduleEvent extends TaskSchedulerEvent {
- private final Map<String, List<FetchImpl>> fetches;
+ private final Map<String, List<FetchProto>> fetches; // map of table name and fetch list
public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
- final Map<String, List<FetchImpl>> fetches) {
+ final Map<String, List<FetchProto>> fetches) {
super(eventType, blockId);
this.fetches = fetches;
}
- public Map<String, List<FetchImpl>> getFetches() {
+ public Map<String, List<FetchProto>> getFetches() {
return fetches;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index e64cd51..bd8311f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -25,8 +25,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.FetchProto;
import org.apache.tajo.SessionVars;
import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.annotation.NotNull;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
@@ -48,7 +50,9 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.querymaster.Task.PullHost;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
@@ -57,13 +61,17 @@ import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.worker.FetchImpl;
+import org.apache.tajo.worker.FetchImpl.RangeParam;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.net.URI;
+import java.net.URLEncoder;
import java.util.*;
import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
@@ -75,7 +83,6 @@ import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
public class Repartitioner {
private static final Log LOG = LogFactory.getLog(Repartitioner.class);
- private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
private final static String UNKNOWN_HOST = "unknown";
public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage)
@@ -546,12 +553,13 @@ public class Repartitioner {
private static void addJoinShuffle(Stage stage, int partitionId,
Map<ExecutionBlockId, List<IntermediateEntry>> grouppedPartitions) {
- Map<String, List<FetchImpl>> fetches = new HashMap<>();
+ Map<String, List<FetchProto>> fetches = new HashMap<>();
for (ExecutionBlock execBlock : stage.getMasterPlan().getChilds(stage.getId())) {
if (grouppedPartitions.containsKey(execBlock.getId())) {
- Collection<FetchImpl> requests = mergeShuffleRequest(partitionId, HASH_SHUFFLE,
+ String name = execBlock.getId().toString();
+ List<FetchProto> requests = mergeShuffleRequest(name, partitionId, HASH_SHUFFLE,
grouppedPartitions.get(execBlock.getId()));
- fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests));
+ fetches.put(name, requests);
}
}
@@ -568,9 +576,10 @@ public class Repartitioner {
*
* @return key: pullserver's address, value: a list of requests
*/
- private static Collection<FetchImpl> mergeShuffleRequest(int partitionId,
- ShuffleType type,
- List<IntermediateEntry> partitions) {
+ private static List<FetchProto> mergeShuffleRequest(final String fetchName,
+ final int partitionId,
+ final ShuffleType type,
+ final List<IntermediateEntry> partitions) {
// ebId + pullhost -> FetchImmpl
Map<String, FetchImpl> mergedPartitions = new HashMap<>();
@@ -582,12 +591,15 @@ public class Repartitioner {
fetch.addPart(partition.getTaskId(), partition.getAttemptId());
} else {
// In some cases like union each IntermediateEntry has different EBID.
- FetchImpl fetch = new FetchImpl(partition.getPullHost(), type, partition.getEbId(), partitionId);
+ FetchImpl fetch = new FetchImpl(fetchName, partition.getPullHost(), type, partition.getEbId(), partitionId);
fetch.addPart(partition.getTaskId(), partition.getAttemptId());
mergedPartitions.put(mergedKey, fetch);
}
}
- return mergedPartitions.values();
+
+ return mergedPartitions.values().stream()
+ .map(fetch -> fetch.getProto())
+ .collect(Collectors.toList());
}
public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext,
@@ -699,44 +711,44 @@ public class Repartitioner {
new String[]{UNKNOWN_HOST});
Stage.scheduleFragment(stage, dummyFragment);
- List<FetchImpl> fetches = new ArrayList<>();
+ Map<Pair<PullHost, ExecutionBlockId>, FetchImpl> fetches = new HashMap<>();
List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId());
for (ExecutionBlock childBlock : childBlocks) {
Stage childExecSM = stage.getContext().getStage(childBlock.getId());
for (Task qu : childExecSM.getTasks()) {
for (IntermediateEntry p : qu.getIntermediateData()) {
- FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
- fetch.addPart(p.getTaskId(), p.getAttemptId());
- fetches.add(fetch);
+ Pair<PullHost, ExecutionBlockId> key = new Pair<>(p.getPullHost(), childBlock.getId());
+ if (fetches.containsKey(key)) {
+ fetches.get(key).addPart(p.getTaskId(), p.getAttemptId());
+ } else {
+ FetchImpl fetch = new FetchImpl(scan.getTableName(), p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0);
+ fetch.addPart(p.getTaskId(), p.getAttemptId());
+ fetches.put(key, fetch);
+ }
}
}
}
- SortedMap<TupleRange, Collection<FetchImpl>> map;
+ SortedMap<TupleRange, Collection<FetchProto>> map;
map = new TreeMap<>();
- Set<FetchImpl> fetchSet;
- try {
- RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema);
- for (int i = 0; i < ranges.length; i++) {
- fetchSet = new HashSet<>();
- for (FetchImpl fetch: fetches) {
- String rangeParam =
- TupleUtil.rangeToQuery(ranges[i], i == (ranges.length - 1) , encoder);
- FetchImpl copy = null;
- try {
- copy = fetch.clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException(e);
- }
- copy.setRangeParams(rangeParam);
- fetchSet.add(copy);
+ Set<FetchProto> fetchSet;
+ RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema);
+ for (int i = 0; i < ranges.length; i++) {
+ fetchSet = new HashSet<>();
+ RangeParam rangeParam = new RangeParam(ranges[i], i == (ranges.length - 1), encoder);
+ for (FetchImpl fetch : fetches.values()) {
+ FetchImpl copy = null;
+ try {
+ copy = fetch.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
}
- map.put(ranges[i], fetchSet);
+ copy.setRangeParams(rangeParam);
+ fetchSet.add(copy.getProto());
}
- } catch (UnsupportedEncodingException e) {
- LOG.error(e);
+ map.put(ranges[i], fetchSet);
}
scheduleFetchesByRoundRobin(stage, map, scan.getTableName(), determinedTaskNum);
@@ -744,20 +756,20 @@ public class Repartitioner {
schedulerContext.setEstimatedTaskNum(determinedTaskNum);
}
- public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchImpl>> partitions,
- String tableName, int num) {
+ public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchProto>> partitions,
+ String tableName, int num) {
int i;
- Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
+ Map<String, List<FetchProto>>[] fetchesArray = new Map[num];
for (i = 0; i < num; i++) {
fetchesArray[i] = new HashMap<>();
}
i = 0;
- for (Entry<?, Collection<FetchImpl>> entry : partitions.entrySet()) {
- Collection<FetchImpl> value = entry.getValue();
+ for (Entry<?, Collection<FetchProto>> entry : partitions.entrySet()) {
+ Collection<FetchProto> value = entry.getValue();
TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value);
if (i == num) i = 0;
}
- for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) {
+ for (Map<String, List<FetchProto>> eachFetches : fetchesArray) {
Stage.scheduleFetches(stage, eachFetches);
}
}
@@ -785,6 +797,10 @@ public class Repartitioner {
return totalVolume;
}
+ public List<FetchProto> getFetchProtos() {
+ return fetchUrls.stream().map(fetch -> fetch.getProto()).collect(Collectors.toList());
+ }
+
}
public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
@@ -821,7 +837,7 @@ public class Repartitioner {
Map<Task.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue());
for (Entry<Task.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
- FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
+ FetchImpl fetch = new FetchImpl(scan.getTableName(), e.getKey(), channel.getShuffleType(),
block.getId(), interm.getKey(), e.getValue());
long volumeSum = 0;
@@ -891,20 +907,16 @@ public class Repartitioner {
}
}
- public static Pair<Long [], Map<String, List<FetchImpl>>[]> makeEvenDistributedFetchImpl(
+ public static Pair<Long [], Map<String, List<FetchProto>>[]> makeEvenDistributedFetchImpl(
Map<Integer, FetchGroupMeta> partitions, String tableName, int num) {
// Sort fetchGroupMeta in a descending order of data volumes.
List<FetchGroupMeta> fetchGroupMetaList = Lists.newArrayList(partitions.values());
- Collections.sort(fetchGroupMetaList, new Comparator<FetchGroupMeta>() {
- @Override
- public int compare(FetchGroupMeta o1, FetchGroupMeta o2) {
- return o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0);
- }
- });
+ Collections.sort(fetchGroupMetaList, (o1, o2) ->
+ o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0));
// Initialize containers
- Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
+ Map<String, List<FetchProto>>[] fetchesArray = new Map[num];
Long [] assignedVolumes = new Long[num];
// initialization
for (int i = 0; i < num; i++) {
@@ -925,7 +937,7 @@ public class Repartitioner {
FetchGroupMeta fetchGroupMeta = iterator.next();
assignedVolumes[p] += fetchGroupMeta.getVolume();
- TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
+ TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos());
p++;
}
@@ -933,13 +945,13 @@ public class Repartitioner {
while (p >= 0 && iterator.hasNext()) {
FetchGroupMeta fetchGroupMeta = iterator.next();
assignedVolumes[p] += fetchGroupMeta.getVolume();
- TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
+ TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos());
// While the current one is smaller than next one, it adds additional fetches to current one.
while(iterator.hasNext() && (p > 0 && assignedVolumes[p - 1] > assignedVolumes[p])) {
FetchGroupMeta additionalFetchGroup = iterator.next();
assignedVolumes[p] += additionalFetchGroup.getVolume();
- TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.fetchUrls);
+ TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.getFetchProtos());
}
p--;
@@ -951,9 +963,9 @@ public class Repartitioner {
public static void scheduleFetchesByEvenDistributedVolumes(Stage stage, Map<Integer, FetchGroupMeta> partitions,
String tableName, int num) {
- Map<String, List<FetchImpl>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
+ Map<String, List<FetchProto>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
// Schedule FetchImpls
- for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) {
+ for (Map<String, List<FetchProto>> eachFetches : fetchsArray) {
Stage.scheduleFetches(stage, eachFetches);
}
}
@@ -976,7 +988,7 @@ public class Repartitioner {
throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " +
"tajo.shuffle.hash.appender.page.volumn-mb");
}
- List<List<FetchImpl>> fetches = new ArrayList<>();
+ List<List<FetchProto>> fetches = new ArrayList<>();
long totalIntermediateSize = 0L;
for (Entry<ExecutionBlockId, List<IntermediateEntry>> listEntry : intermediates.entrySet()) {
@@ -996,7 +1008,7 @@ public class Repartitioner {
// Grouping or splitting to fit $DIST_QUERY_TABLE_PARTITION_VOLUME size
for (List<IntermediateEntry> partitionEntries : partitionIntermMap.values()) {
- List<List<FetchImpl>> eachFetches = splitOrMergeIntermediates(listEntry.getKey(), partitionEntries,
+ List<List<FetchProto>> eachFetches = splitOrMergeIntermediates(tableName, listEntry.getKey(), partitionEntries,
splitVolume, pageSize);
if (eachFetches != null && !eachFetches.isEmpty()) {
fetches.addAll(eachFetches);
@@ -1007,8 +1019,8 @@ public class Repartitioner {
schedulerContext.setEstimatedTaskNum(fetches.size());
int i = 0;
- Map<String, List<FetchImpl>>[] fetchesArray = new Map[fetches.size()];
- for(List<FetchImpl> entry : fetches) {
+ Map<String, List<FetchProto>>[] fetchesArray = new Map[fetches.size()];
+ for(List<FetchProto> entry : fetches) {
fetchesArray[i] = new HashMap<>();
fetchesArray[i].put(tableName, entry);
@@ -1030,16 +1042,16 @@ public class Repartitioner {
* @param splitVolume
* @return
*/
- public static List<List<FetchImpl>> splitOrMergeIntermediates(
+ public static List<List<FetchProto>> splitOrMergeIntermediates(@NotNull String fetchName,
ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) {
// Each List<FetchImpl> has splitVolume size.
- List<List<FetchImpl>> fetches = new ArrayList<>();
+ List<List<FetchProto>> fetches = new ArrayList<>();
Iterator<IntermediateEntry> iter = entries.iterator();
if (!iter.hasNext()) {
return null;
}
- List<FetchImpl> fetchListForSingleTask = new ArrayList<>();
+ List<FetchProto> fetchListForSingleTask = new ArrayList<>();
long fetchListVolume = 0;
while (iter.hasNext()) {
@@ -1065,11 +1077,11 @@ public class Repartitioner {
fetchListForSingleTask = new ArrayList<>();
fetchListVolume = 0;
}
- FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
+ FetchImpl fetch = new FetchImpl(fetchName, currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
ebId, currentInterm.getPartId(), TUtil.newList(currentInterm));
fetch.setOffset(eachSplit.getFirst());
fetch.setLength(eachSplit.getSecond());
- fetchListForSingleTask.add(fetch);
+ fetchListForSingleTask.add(fetch.getProto());
fetchListVolume += eachSplit.getSecond();
}
}
@@ -1079,19 +1091,56 @@ public class Repartitioner {
return fetches;
}
- public static List<URI> createFetchURL(FetchImpl fetch, boolean includeParts) {
+ /**
+ * Get the pull server URIs.
+ */
+ public static List<URI> createFullURIs(int maxUrlLength, FetchProto fetch) {
+ return createFetchURL(maxUrlLength, fetch, true);
+ }
+
+ /**
+ * Get the pull server URIs without repeated parameters.
+ */
+ public static List<URI> createSimpleURIs(int maxUrlLength, FetchProto fetch) {
+ return createFetchURL(maxUrlLength, fetch, false);
+ }
+
+ private static String getRangeParam(FetchProto proto) {
+ StringBuilder sb = new StringBuilder();
+ String firstKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeStart().toByteArray()));
+ String lastKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeEnd().toByteArray()));
+
+ try {
+ sb.append("start=")
+ .append(URLEncoder.encode(firstKeyBase64, "utf-8"))
+ .append("&")
+ .append("end=")
+ .append(URLEncoder.encode(lastKeyBase64, "utf-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (proto.getRangeLastInclusive()) {
+ sb.append("&final=true");
+ }
+
+ return sb.toString();
+ }
+
+ public static List<URI> createFetchURL(int maxUrlLength, FetchProto fetch, boolean includeParts) {
String scheme = "http://";
StringBuilder urlPrefix = new StringBuilder(scheme);
- urlPrefix.append(fetch.getPullHost().getHost()).append(":").append(fetch.getPullHost().getPort()).append("/?")
- .append("qid=").append(fetch.getExecutionBlockId().getQueryId().toString())
- .append("&sid=").append(fetch.getExecutionBlockId().getId())
+ ExecutionBlockId ebId = new ExecutionBlockId(fetch.getExecutionBlockId());
+ urlPrefix.append(fetch.getHost()).append(":").append(fetch.getPort()).append("/?")
+ .append("qid=").append(ebId.getQueryId().toString())
+ .append("&sid=").append(ebId.getId())
.append("&p=").append(fetch.getPartitionId())
.append("&type=");
if (fetch.getType() == HASH_SHUFFLE) {
urlPrefix.append("h");
} else if (fetch.getType() == RANGE_SHUFFLE) {
- urlPrefix.append("r").append("&").append(fetch.getRangeParams());
+ urlPrefix.append("r").append("&").append(getRangeParam(fetch));
} else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) {
urlPrefix.append("s");
}
@@ -1105,17 +1154,26 @@ public class Repartitioner {
if (fetch.getType() == HASH_SHUFFLE || fetch.getType() == SCATTERED_HASH_SHUFFLE) {
fetchURLs.add(URI.create(urlPrefix.toString()));
} else {
+ urlPrefix.append("&ta=");
// If the get request is longer than 2000 characters,
// the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
// Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
// The below code transforms a long request to multiple requests.
List<String> taskIdsParams = new ArrayList<>();
StringBuilder taskIdListBuilder = new StringBuilder();
- List<Integer> taskIds = fetch.getTaskIds();
- List<Integer> attemptIds = fetch.getAttemptIds();
+
+ final List<Integer> taskIds = fetch.getTaskIdList();
+ final List<Integer> attemptIds = fetch.getAttemptIdList();
+
+ // Sort task ids to increase cache hit in pull server
+ final List<Pair<Integer, Integer>> taskAndAttemptIds = IntStream.range(0, taskIds.size())
+ .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i)))
+ .sorted((p1, p2) -> p1.getFirst() - p2.getFirst())
+ .collect(Collectors.toList());
+
boolean first = true;
- for (int i = 0; i < taskIds.size(); i++) {
+ for (int i = 0; i < taskAndAttemptIds.size(); i++) {
StringBuilder taskAttemptId = new StringBuilder();
if (!first) { // when comma is added?
@@ -1124,17 +1182,16 @@ public class Repartitioner {
first = false;
}
- int taskId = taskIds.get(i);
+ int taskId = taskAndAttemptIds.get(i).getFirst();
if (taskId < 0) {
// In the case of hash shuffle each partition has single shuffle file per worker.
// TODO If file is large, consider multiple fetching(shuffle file can be split)
continue;
}
- int attemptId = attemptIds.get(i);
+ int attemptId = taskAndAttemptIds.get(i).getSecond();
taskAttemptId.append(taskId).append("_").append(attemptId);
- if (taskIdListBuilder.length() + taskAttemptId.length()
- > HTTP_REQUEST_MAXIMUM_LENGTH) {
+ if (urlPrefix.length() + taskIdListBuilder.length() > maxUrlLength) {
taskIdsParams.add(taskIdListBuilder.toString());
taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
} else {
@@ -1145,7 +1202,6 @@ public class Repartitioner {
if (taskIdListBuilder.length() > 0) {
taskIdsParams.add(taskIdListBuilder.toString());
}
- urlPrefix.append("&ta=");
for (String param : taskIdsParams) {
fetchURLs.add(URI.create(urlPrefix + param));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 254df64..5f050bf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -65,7 +65,6 @@ import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.history.StageHistory;
import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.worker.FetchImpl;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -1189,7 +1188,7 @@ public class Stage implements EventHandler<StageEvent> {
stage.getId(), leftFragment, rightFragments));
}
- public static void scheduleFetches(Stage stage, Map<String, List<FetchImpl>> fetches) {
+ public static void scheduleFetches(Stage stage, Map<String, List<FetchProto>> fetches) {
stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
stage.getId(), fetches));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index 95a7170..9d038ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -34,6 +34,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.*;
@@ -46,7 +47,6 @@ import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.worker.FetchImpl;
import java.net.URI;
import java.util.*;
@@ -55,8 +55,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import static org.apache.tajo.ResourceProtos.*;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
public class Task implements EventHandler<TaskEvent> {
/** Class Logger */
@@ -70,7 +70,7 @@ public class Task implements EventHandler<TaskEvent> {
private List<ScanNode> scan;
private Map<String, Set<FragmentProto>> fragMap;
- private Map<String, Set<FetchImpl>> fetchMap;
+ private Map<String, Set<FetchProto>> fetchMap;
private int totalFragmentNum;
@@ -100,6 +100,8 @@ public class Task implements EventHandler<TaskEvent> {
private TaskHistory finalTaskHistory;
+ private final int maxUrlLength;
+
protected static final StateMachineFactory
<Task, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
new StateMachineFactory <Task, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
@@ -207,6 +209,8 @@ public class Task implements EventHandler<TaskEvent> {
stateMachine = stateMachineFactory.make(this);
totalFragmentNum = 0;
+ maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(),
+ ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal);
}
public boolean isLeafTask() {
@@ -282,9 +286,9 @@ public class Task implements EventHandler<TaskEvent> {
taskHistory.setFragments(fragmentList.toArray(new String[fragmentList.size()]));
List<String[]> fetchList = new ArrayList<>();
- for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) {
- for (FetchImpl f : e.getValue()) {
- for (URI uri : f.getSimpleURIs()){
+ for (Map.Entry<String, Set<FetchProto>> e : getFetchMap().entrySet()) {
+ for (FetchProto f : e.getValue()) {
+ for (URI uri : Repartitioner.createSimpleURIs(maxUrlLength, f)) {
fetchList.add(new String[] {e.getKey(), uri.toString()});
}
}
@@ -364,8 +368,8 @@ public class Task implements EventHandler<TaskEvent> {
return succeededWorker;
}
- public void addFetches(String tableId, Collection<FetchImpl> fetches) {
- Set<FetchImpl> fetchSet;
+ public void addFetches(String tableId, Collection<FetchProto> fetches) {
+ Set<FetchProto> fetchSet;
if (fetchMap.containsKey(tableId)) {
fetchSet = fetchMap.get(tableId);
} else {
@@ -375,7 +379,7 @@ public class Task implements EventHandler<TaskEvent> {
fetchMap.put(tableId, fetchSet);
}
- public void setFetches(Map<String, Set<FetchImpl>> fetches) {
+ public void setFetches(Map<String, Set<FetchProto>> fetches) {
this.fetchMap.clear();
this.fetchMap.putAll(fetches);
}
@@ -395,27 +399,15 @@ public class Task implements EventHandler<TaskEvent> {
public TaskId getId() {
return taskId;
}
-
- public Collection<FetchImpl> getFetchHosts(String tableId) {
- return fetchMap.get(tableId);
- }
-
- public Collection<Set<FetchImpl>> getFetches() {
+
+ public Collection<Set<FetchProto>> getFetches() {
return fetchMap.values();
}
- public Map<String, Set<FetchImpl>> getFetchMap() {
+ public Map<String, Set<FetchProto>> getFetchMap() {
return fetchMap;
}
-
- public Collection<FetchImpl> getFetch(ScanNode scan) {
- return this.fetchMap.get(scan.getTableName());
- }
-
- public ScanNode[] getScanNodes() {
- return this.scan.toArray(new ScanNode[scan.size()]);
- }
-
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
@@ -426,10 +418,10 @@ public class Task implements EventHandler<TaskEvent> {
builder.append(fragment).append(", ");
}
}
- for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) {
+ for (Entry<String, Set<FetchProto>> e : fetchMap.entrySet()) {
builder.append(e.getKey()).append(" : ");
- for (FetchImpl t : e.getValue()) {
- for (URI uri : t.getURIs()){
+ for (FetchProto t : e.getValue()) {
+ for (URI uri : Repartitioner.createFullURIs(maxUrlLength, t)){
builder.append(uri).append(" ");
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 65cb6ac..098567a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -20,6 +20,12 @@ package org.apache.tajo.worker;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.codec.http.HttpHeaders.Names;
+import io.netty.handler.codec.http.HttpHeaders.Values;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -32,20 +38,20 @@ import org.apache.tajo.TajoProtos;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.rpc.AsyncRpcClient;
-import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.event.ExecutionBlockErrorEvent;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -191,10 +197,64 @@ public class ExecutionBlockContext {
}
tasks.clear();
taskHistories.clear();
+
+ // Clear index cache in pull server
+ clearIndexCache();
+
resource.release();
RpcClientManager.cleanup(queryMasterClient);
}
+ /**
+ * Send a request to {@link TajoPullServerService} to clear index cache
+ */
+ private void clearIndexCache() {
+ // Avoid unnecessary cache clear request when the current eb is a leaf eb
+ if (executionBlockId.getId() > 1) {
+ Bootstrap bootstrap = new Bootstrap()
+ .group(NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER, 1))
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000)
+ .option(ChannelOption.TCP_NODELAY, true);
+ ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("codec", new HttpClientCodec());
+ }
+ };
+ bootstrap.handler(initializer);
+
+ WorkerConnectionInfo connInfo = workerContext.getConnectionInfo();
+ ChannelFuture future = bootstrap.connect(new InetSocketAddress(connInfo.getHost(), connInfo.getPullServerPort()))
+ .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+
+ try {
+ Channel channel = future.await().channel();
+ if (!future.isSuccess()) {
+ // Upon failure to connect to pull server, cache clear message is just ignored.
+ LOG.warn(future.cause());
+ return;
+ }
+
+ // Example of URI: /ebid=eb_1450063997899_0015_000002
+ ExecutionBlockId clearEbId = new ExecutionBlockId(executionBlockId.getQueryId(), executionBlockId.getId() - 1);
+ HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.DELETE, "ebid=" + clearEbId.toString());
+ request.headers().set(Names.HOST, connInfo.getHost());
+ request.headers().set(Names.CONNECTION, Values.CLOSE);
+ channel.writeAndFlush(request);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (future != null && future.channel().isOpen()) {
+ // Close the channel to exit.
+ future.channel().close();
+ }
+ }
+ }
+ }
+
public TajoConf getConf() {
return systemConf;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 7d2033c..b49d449 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -21,16 +21,21 @@ package org.apache.tajo.worker;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.ResourceProtos.FetchProto;
import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.querymaster.Task;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TUtil;
-import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
@@ -42,8 +47,8 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
private ShuffleType type; // hash or range partition method.
private ExecutionBlockId executionBlockId; // The executionBlock id
private int partitionId; // The hash partition id
- private String name; // The intermediate source name
- private String rangeParams; // optional, the http parameters of range partition. (e.g., start=xx&end=yy)
+ private final String name; // The intermediate source name
+ private RangeParam rangeParam; // optional, range parameter for range shuffle
private boolean hasNext = false; // optional, if true, has more taskIds
private List<Integer> taskIds; // repeated, the task ids
@@ -52,19 +57,48 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
private long offset = -1;
private long length = -1;
- public FetchImpl() {
- taskIds = new ArrayList<>();
- attemptIds = new ArrayList<>();
+ public static class RangeParam {
+ private byte[] start;
+ private byte[] end;
+ private boolean lastInclusive;
+
+ public RangeParam(TupleRange range, boolean lastInclusive, RowStoreEncoder encoder) {
+ this.start = encoder.toBytes(range.getStart());
+ this.end = encoder.toBytes(range.getEnd());
+ this.lastInclusive = lastInclusive;
+ }
+
+ public RangeParam(byte[] start, byte[] end, boolean lastInclusive) {
+ this.start = start;
+ this.end = end;
+ this.lastInclusive = lastInclusive;
+ }
+
+ public byte[] getStart() {
+ return start;
+ }
+
+ public byte[] getEnd() {
+ return end;
+ }
+
+ public boolean isLastInclusive() {
+ return lastInclusive;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(Arrays.hashCode(start), Arrays.hashCode(end), lastInclusive);
+ }
}
public FetchImpl(FetchProto proto) {
- this(new Task.PullHost(proto.getHost(), proto.getPort()),
+ this(proto.getName(),
+ new Task.PullHost(proto.getHost(), proto.getPort()),
proto.getType(),
new ExecutionBlockId(proto.getExecutionBlockId()),
proto.getPartitionId(),
- proto.getRangeParams(),
proto.getHasNext(),
- proto.getName(),
proto.getTaskIdList(), proto.getAttemptIdList());
if (proto.hasOffset()) {
@@ -74,31 +108,41 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
if (proto.hasLength()) {
this.length = proto.getLength();
}
+
+ if (proto.hasRangeStart()) {
+ this.rangeParam = new RangeParam(proto.getRangeStart().toByteArray(),
+ proto.getRangeEnd().toByteArray(), proto.getRangeLastInclusive());
+ }
}
- public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+ public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
int partitionId) {
- this(host, type, executionBlockId, partitionId, null, false, null,
- new ArrayList<>(), new ArrayList<>());
+ this(name, host, type, executionBlockId, partitionId, null, false, new ArrayList<>(), new ArrayList<>());
}
- public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+ public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
int partitionId, List<Task.IntermediateEntry> intermediateEntryList) {
- this(host, type, executionBlockId, partitionId, null, false, null,
+ this(name, host, type, executionBlockId, partitionId, null, false,
new ArrayList<>(), new ArrayList<>());
for (Task.IntermediateEntry entry : intermediateEntryList){
addPart(entry.getTaskId(), entry.getAttemptId());
}
}
- public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
- int partitionId, String rangeParams, boolean hasNext, String name,
+ public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+ int partitionId, boolean hasNext,
+ List<Integer> taskIds, List<Integer> attemptIds) {
+ this(name, host, type, executionBlockId, partitionId, null, hasNext, taskIds, attemptIds);
+ }
+
+ public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
+ int partitionId, RangeParam rangeParam, boolean hasNext,
List<Integer> taskIds, List<Integer> attemptIds) {
this.host = host;
this.type = type;
this.executionBlockId = executionBlockId;
this.partitionId = partitionId;
- this.rangeParams = rangeParams;
+ this.rangeParam = rangeParam;
this.hasNext = hasNext;
this.name = name;
this.taskIds = taskIds;
@@ -107,7 +151,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
@Override
public int hashCode() {
- return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams,
+ return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParam.hashCode(),
hasNext, taskIds, attemptIds, offset, length);
}
@@ -123,11 +167,14 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
builder.setHasNext(hasNext);
builder.setName(name);
- if (rangeParams != null && !rangeParams.isEmpty()) {
- builder.setRangeParams(rangeParams);
+ if (rangeParam != null) {
+ builder.setRangeStart(ByteString.copyFrom(rangeParam.getStart()));
+ builder.setRangeEnd(ByteString.copyFrom(rangeParam.getEnd()));
+ builder.setRangeLastInclusive(rangeParam.isLastInclusive());
}
Preconditions.checkArgument(taskIds.size() == attemptIds.size());
+
builder.addAllTaskId(taskIds);
builder.addAllAttemptId(attemptIds);
@@ -141,10 +188,6 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
this.attemptIds.add(attemptId);
}
- public Task.PullHost getPullHost() {
- return this.host;
- }
-
public ExecutionBlockId getExecutionBlockId() {
return executionBlockId;
}
@@ -153,20 +196,8 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
this.executionBlockId = executionBlockId;
}
- public int getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- }
-
- public String getRangeParams() {
- return rangeParams;
- }
-
- public void setRangeParams(String rangeParams) {
- this.rangeParams = rangeParams;
+ public void setRangeParams(RangeParam rangeParams) {
+ this.rangeParam = rangeParams;
}
public boolean hasNext() {
@@ -185,36 +216,10 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
this.type = type;
}
- /**
- * Get the pull server URIs.
- */
- public List<URI> getURIs(){
- return Repartitioner.createFetchURL(this, true);
- }
-
- /**
- * Get the pull server URIs without repeated parameters.
- */
- public List<URI> getSimpleURIs(){
- return Repartitioner.createFetchURL(this, false);
- }
-
public String getName() {
return name;
}
- public void setName(String name) {
- this.name = name;
- }
-
- public List<Integer> getTaskIds() {
- return taskIds;
- }
-
- public List<Integer> getAttemptIds() {
- return attemptIds;
- }
-
public long getOffset() {
return offset;
}
@@ -238,8 +243,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
newFetchImpl.type = type;
newFetchImpl.executionBlockId = executionBlockId;
newFetchImpl.partitionId = partitionId;
- newFetchImpl.name = name;
- newFetchImpl.rangeParams = rangeParams;
+ newFetchImpl.rangeParam = rangeParam;
newFetchImpl.hasNext = hasNext;
if (taskIds != null) {
newFetchImpl.taskIds = Lists.newArrayList(taskIds);
@@ -269,7 +273,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
TUtil.checkEquals(executionBlockId, fetch.executionBlockId) &&
TUtil.checkEquals(host, fetch.host) &&
TUtil.checkEquals(name, fetch.name) &&
- TUtil.checkEquals(rangeParams, fetch.rangeParams) &&
+ TUtil.checkEquals(rangeParam, fetch.rangeParam) &&
TUtil.checkEquals(taskIds, fetch.taskIds) &&
TUtil.checkEquals(type, fetch.type) &&
TUtil.checkEquals(offset, fetch.offset) &&
http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index b5abffe..250b4cc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -31,7 +31,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.NettyUtils;
@@ -42,6 +44,8 @@ import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@@ -67,6 +71,7 @@ public class Fetcher {
private TajoProtos.FetcherState state;
private Bootstrap bootstrap;
+ private List<Long> chunkLengths = new ArrayList<>();
public Fetcher(TajoConf conf, URI uri, FileChunk chunk) {
this.uri = uri;
@@ -97,9 +102,6 @@ public class Fetcher {
conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
.option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
.option(ChannelOption.TCP_NODELAY, true);
-
- ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
- bootstrap.handler(initializer);
}
}
@@ -123,12 +125,20 @@ public class Fetcher {
return messageReceiveCount;
}
- public FileChunk get() throws IOException {
+ public List<FileChunk> get() throws IOException {
+ List<FileChunk> fileChunks = new ArrayList<>();
if (useLocalFile) {
startTime = System.currentTimeMillis();
finishTime = System.currentTimeMillis();
state = TajoProtos.FetcherState.FETCH_FINISHED;
- return fileChunk;
+ fileChunks.add(fileChunk);
+ fileLen = fileChunk.getFile().length();
+ return fileChunks;
+ }
+
+ if (state == FetcherState.FETCH_INIT) {
+ ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
+ bootstrap.handler(initializer);
}
this.startTime = System.currentTimeMillis();
@@ -136,7 +146,7 @@ public class Fetcher {
ChannelFuture future = null;
try {
future = bootstrap.clone().connect(new InetSocketAddress(host, port))
- .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+ .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().channel();
@@ -154,7 +164,7 @@ public class Fetcher {
request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
if(LOG.isDebugEnabled()) {
- LOG.info("Status: " + getState() + ", URI:" + uri);
+ LOG.debug("Status: " + getState() + ", URI:" + uri);
}
// Send the HTTP request.
channel.writeAndFlush(request);
@@ -163,7 +173,18 @@ public class Fetcher {
channel.closeFuture().syncUninterruptibly();
fileChunk.setLength(fileChunk.getFile().length());
- return fileChunk;
+
+ long start = 0;
+ for (Long eachChunkLength : chunkLengths) {
+ if (eachChunkLength == 0) continue;
+ FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength);
+ chunk.setEbId(fileChunk.getEbId());
+ chunk.setFromRemote(fileChunk.fromRemote());
+ fileChunks.add(chunk);
+ start += eachChunkLength;
+ }
+ return fileChunks;
+
} finally {
if(future != null && future.channel().isOpen()){
// Close the channel to exit.
@@ -226,6 +247,13 @@ public class Fetcher {
}
}
}
+ if (response.headers().contains(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME)) {
+ String stringOffset = response.headers().get(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME);
+
+ for (String eachSplit : stringOffset.split(",")) {
+ chunkLengths.add(Long.parseLong(eachSplit));
+ }
+ }
}
if (LOG.isDebugEnabled()) {
LOG.debug(sb.toString());
@@ -296,6 +324,7 @@ public class Fetcher {
if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
//channel is closed, but cannot complete fetcher
finishTime = System.currentTimeMillis();
+ LOG.error("Channel closed by peer: " + ctx.channel());
state = TajoProtos.FetcherState.FETCH_FAILED;
}
IOUtils.cleanup(LOG, fc, raf);