You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/24 10:16:12 UTC
git commit: TAJO-983: Worker should directly read Intermediate data
stored in localhost rather than fetching. (Mai Hai Thanh via hyunsik)
Repository: tajo
Updated Branches:
refs/heads/master 9e026a9a2 -> 644b7cd99
TAJO-983: Worker should directly read Intermediate data stored in localhost rather than fetching. (Mai Hai Thanh via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/644b7cd9
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/644b7cd9
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/644b7cd9
Branch: refs/heads/master
Commit: 644b7cd991ea402115c6dc1d198e7f1d7f41771b
Parents: 9e026a9
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Sep 24 01:08:43 2014 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Sep 24 01:08:43 2014 -0700
----------------------------------------------------------------------
CHANGES | 3 +
.../engine/planner/UniformRangePartition.java | 3 +-
.../planner/physical/ExternalSortExec.java | 92 +++++++---
.../java/org/apache/tajo/worker/Fetcher.java | 37 ++--
.../main/java/org/apache/tajo/worker/Task.java | 182 ++++++++++++++++++-
.../org/apache/tajo/worker/TestFetcher.java | 25 ++-
.../java/org/apache/tajo/storage/RawFile.java | 71 +++++---
.../tajo/pullserver/PullServerAuxService.java | 6 +-
.../tajo/pullserver/TajoPullServerService.java | 8 +-
.../tajo/pullserver/retriever/FileChunk.java | 38 +++-
10 files changed, 371 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7cbd524..6a64bc9 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,9 @@ Release 0.9.0 - unreleased
IMPROVEMENT
+ TAJO-983: Worker should directly read Intermediate data stored in localhost
+ rather than fetching. (Mai Hai Thanh via hyunsik)
+
TAJO-910: Simple query (non-forwarded query) should be supported against
partition tables. (Hyoungjun Kim)
http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index db12285..551a9d0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.planner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.primitives.UnsignedLong;
import com.sun.tools.javac.util.Convert;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SortSpec;
@@ -571,7 +570,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
} else {
if (isPureAscii[i]) {
- lastBigInt = UnsignedLong.valueOf(new BigInteger(last.get(i).asByteArray())).bigIntegerValue();
+ lastBigInt = new BigInteger(last.get(i).asByteArray());
if (sortSpecs[i].isAscending()) {
end.put(i, DatumFactory.createText(lastBigInt.add(incs[i]).toByteArray()));
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 700e34d..0094590 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -19,6 +19,7 @@
package org.apache.tajo.engine.planner.physical;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -45,6 +46,7 @@ import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
+import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
@@ -66,6 +68,8 @@ import static org.apache.tajo.storage.RawFile.RawFileScanner;
public class ExternalSortExec extends SortExec {
/** Class logger */
private static final Log LOG = LogFactory.getLog(ExternalSortExec.class);
+ /** The prefix of fragment name for intermediate */
+ private static final String INTERMEDIATE_FILE_PREFIX = "@interFile_";
private SortNode plan;
private final TableMeta meta;
@@ -86,9 +90,9 @@ public class ExternalSortExec extends SortExec {
/** local file system */
private final RawLocalFileSystem localFS;
/** final output files which are used for cleaning */
- private List<Path> finalOutputFiles = null;
+ private List<FileFragment> finalOutputFiles = null;
/** for directly merging sorted inputs */
- private List<Path> mergedInputPaths = null;
+ private List<FileFragment> mergedInputFragments = null;
///////////////////////////////////////////////////
// transient variables
@@ -129,10 +133,10 @@ public class ExternalSortExec extends SortExec {
final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException {
this(context, sm, plan);
- mergedInputPaths = TUtil.newList();
+ mergedInputFragments = TUtil.newList();
for (CatalogProtos.FragmentProto proto : fragments) {
FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
- mergedInputPaths.add(fragment.getPath());
+ mergedInputFragments.add(fragment);
}
}
@@ -269,9 +273,9 @@ public class ExternalSortExec extends SortExec {
if (!sorted) { // if not sorted, first sort all data
// if input files are given, it starts merging directly.
- if (mergedInputPaths != null) {
+ if (mergedInputFragments != null) {
try {
- this.result = externalMergeAndSort(mergedInputPaths);
+ this.result = externalMergeAndSort(mergedInputFragments);
} catch (Exception e) {
throw new PhysicalPlanningException(e);
}
@@ -287,7 +291,14 @@ public class ExternalSortExec extends SortExec {
} else { // if input data exceeds main-memory at least once
try {
- this.result = externalMergeAndSort(chunks);
+ List<FileFragment> fragments = TUtil.newList();
+ for (Path chunk : chunks) {
+ FileFragment frag = new FileFragment("", chunk, 0,
+ new File(localFS.makeQualified(chunk).toUri()).length());
+ fragments.add(frag);
+ }
+
+ this.result = externalMergeAndSort(fragments);
} catch (Exception e) {
throw new PhysicalPlanningException(e);
}
@@ -328,11 +339,11 @@ public class ExternalSortExec extends SortExec {
return computedFanout;
}
- private Scanner externalMergeAndSort(List<Path> chunks)
+ private Scanner externalMergeAndSort(List<FileFragment> chunks)
throws IOException, ExecutionException, InterruptedException {
int level = 0;
- final List<Path> inputFiles = TUtil.newList(chunks);
- final List<Path> outputFiles = TUtil.newList();
+ final List<FileFragment> inputFiles = TUtil.newList(chunks);
+ final List<FileFragment> outputFiles = TUtil.newList();
int remainRun = inputFiles.size();
int chunksSize = chunks.size();
@@ -368,7 +379,7 @@ public class ExternalSortExec extends SortExec {
info(LOG, "Unbalanced merge possibility detected: number of remain input (" + remainInputRuns
+ ") and output files (" + outputFileNum + ") <= " + defaultFanout);
- List<Path> switched = TUtil.newList();
+ List<FileFragment> switched = TUtil.newList();
// switch the remain inputs to the next outputs
for (int j = startIdx; j < inputFiles.size(); j++) {
switched.add(inputFiles.get(j));
@@ -383,7 +394,7 @@ public class ExternalSortExec extends SortExec {
// wait for all sort runners
int finishedMerger = 0;
int index = 0;
- for (Future<Path> future : futures) {
+ for (Future<FileFragment> future : futures) {
outputFiles.add(future.get());
// Getting the number of merged files
finishedMerger += numberOfMergingFiles.get(index++);
@@ -391,11 +402,32 @@ public class ExternalSortExec extends SortExec {
progress = ((float)finishedMerger/(float)chunksSize) * 0.5f;
}
- // delete merged intermediate files
- for (Path path : inputFiles) {
- localFS.delete(path, true);
+ /*
+ * delete merged intermediate files
+ *
+ * There may be 4 different types of file fragments in the list inputFiles
+ * + A: a fragment created from fetched data from a remote host. By default, this fragment represents
+ * a whole physical file (i.e., startOffset == 0 and length == length of physical file)
+ * + B1: a fragment created from a local file (pseudo-fetched data from local host) in which the fragment
+ * represents the whole physical file (i.e., startOffset == 0 AND length == length of physical file)
+ * + B2: a fragment created from a local file (pseudo-fetched data from local host) in which the fragment
+ * represents only a part of the physical file (i.e., startOffset > 0 OR length != length of physical file)
+ * + C: a fragment created from merging some fragments of the above types. When this fragment is created,
+ * its startOffset is set to 0 and its length is set to the length of the physical file, automatically
+ *
+ * Fragments of types A, B1, and B2 are inputs of ExternalSortExec. Among them, only B2-type fragments will
+ * possibly be used by another task in the future. Thus, ideally, all fragments of types A, B1, and C can be
+ * deleted at this point. However, for the ease of future code maintenance, we delete only type-C fragments here
+ */
+ int numDeletedFiles = 0;
+ for (FileFragment frag : inputFiles) {
+ if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX) == true) {
+ localFS.delete(frag.getPath(), true);
+ numDeletedFiles++;
+ LOG.info("Delete merged intermediate file: " + frag);
+ }
}
- info(LOG, inputFiles.size() + " merged intermediate files deleted");
+ info(LOG, numDeletedFiles + " merged intermediate files deleted");
// switch input files to output files, and then clear outputFiles
inputFiles.clear();
@@ -418,15 +450,15 @@ public class ExternalSortExec extends SortExec {
/**
* Merge Thread
*/
- private class KWayMergerCaller implements Callable<Path> {
+ private class KWayMergerCaller implements Callable<FileFragment> {
final int level;
final int nextRunId;
- final List<Path> inputFiles;
+ final List<FileFragment> inputFiles;
final int startIdx;
final int mergeFanout;
final boolean updateInputStats;
- public KWayMergerCaller(final int level, final int nextRunId, final List<Path> inputFiles,
+ public KWayMergerCaller(final int level, final int nextRunId, final List<FileFragment> inputFiles,
final int startIdx, final int mergeFanout, final boolean updateInputStats) {
this.level = level;
this.nextRunId = nextRunId;
@@ -437,7 +469,7 @@ public class ExternalSortExec extends SortExec {
}
@Override
- public Path call() throws Exception {
+ public FileFragment call() throws Exception {
final Path outputPath = getChunkPathForWrite(level + 1, nextRunId);
info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName());
long mergeStartTime = System.currentTimeMillis();
@@ -455,7 +487,9 @@ public class ExternalSortExec extends SortExec {
info(LOG, outputPath.getName() + " is written to a disk. ("
+ FileUtil.humanReadableByteCount(output.getOffset(), false)
+ " bytes, " + (mergeEndTime - mergeStartTime) + " msec)");
- return outputPath;
+ File f = new File(localFS.makeQualified(outputPath).toUri());
+ FileFragment frag = new FileFragment(INTERMEDIATE_FILE_PREFIX + outputPath.getName(), outputPath, 0, f.length());
+ return frag;
}
}
@@ -469,7 +503,7 @@ public class ExternalSortExec extends SortExec {
/**
* Create a merged file scanner or k-way merge scanner.
*/
- private Scanner createFinalMerger(List<Path> inputs) throws IOException {
+ private Scanner createFinalMerger(List<FileFragment> inputs) throws IOException {
if (inputs.size() == 1) {
this.result = getFileScanner(inputs.get(0));
} else {
@@ -478,11 +512,11 @@ public class ExternalSortExec extends SortExec {
return result;
}
- private Scanner getFileScanner(Path path) throws IOException {
- return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, path);
+ private Scanner getFileScanner(FileFragment frag) throws IOException {
+ return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, frag);
}
- private Scanner createKWayMerger(List<Path> inputs, final int startChunkId, final int num) throws IOException {
+ private Scanner createKWayMerger(List<FileFragment> inputs, final int startChunkId, final int num) throws IOException {
final Scanner [] sources = new Scanner[num];
for (int i = 0; i < num; i++) {
sources[i] = getFileScanner(inputs.get(startChunkId + i));
@@ -741,8 +775,12 @@ public class ExternalSortExec extends SortExec {
}
if (finalOutputFiles != null) {
- for (Path path : finalOutputFiles) {
- localFS.delete(path, true);
+ for (FileFragment frag : finalOutputFiles) {
+ File tmpFile = new File(localFS.makeQualified(frag.getPath()).toUri());
+ if (frag.getStartKey() == 0 && frag.getEndKey() == tmpFile.length()) {
+ localFS.delete(frag.getPath(), true);
+ LOG.info("Delete file: " + frag);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 4867fe4..742a025 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.retriever.FileChunk;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.*;
@@ -51,11 +52,12 @@ public class Fetcher {
private final static Log LOG = LogFactory.getLog(Fetcher.class);
private final URI uri;
- private final File file;
+ private final FileChunk fileChunk;
private final TajoConf conf;
private final String host;
private int port;
+ private final boolean useLocalFile;
private long startTime;
private long finishTime;
@@ -66,9 +68,10 @@ public class Fetcher {
private ClientBootstrap bootstrap;
- public Fetcher(TajoConf conf, URI uri, File file, ClientSocketChannelFactory factory, Timer timer) {
+ public Fetcher(TajoConf conf, URI uri, FileChunk chunk, ClientSocketChannelFactory factory, Timer timer) {
this.uri = uri;
- this.file = file;
+ this.fileChunk = chunk;
+ this.useLocalFile = !chunk.fromRemote();
this.state = TajoProtos.FetcherState.FETCH_INIT;
this.conf = conf;
this.timer = timer;
@@ -84,13 +87,15 @@ public class Fetcher {
}
}
- bootstrap = new ClientBootstrap(factory);
- bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
- bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
- bootstrap.setOption("tcpNoDelay", true);
+ if (!useLocalFile) {
+ bootstrap = new ClientBootstrap(factory);
+ bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
+ bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
+ bootstrap.setOption("tcpNoDelay", true);
- ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(file);
- bootstrap.setPipelineFactory(pipelineFactory);
+ ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(fileChunk.getFile());
+ bootstrap.setPipelineFactory(pipelineFactory);
+ }
}
public long getStartTime() {
@@ -113,7 +118,16 @@ public class Fetcher {
return messageReceiveCount;
}
- public File get() throws IOException {
+ public FileChunk get() throws IOException {
+ if (useLocalFile) {
+ LOG.info("Get pseudo fetch from local host");
+ startTime = System.currentTimeMillis();
+ finishTime = System.currentTimeMillis();
+ state = TajoProtos.FetcherState.FETCH_FINISHED;
+ return fileChunk;
+ }
+
+ LOG.info("Get real fetch from remote host");
this.startTime = System.currentTimeMillis();
this.state = TajoProtos.FetcherState.FETCH_FETCHING;
ChannelFuture future = null;
@@ -145,7 +159,8 @@ public class Fetcher {
channelFuture.addListener(ChannelFutureListener.CLOSE);
- return file;
+ fileChunk.setLength(fileChunk.getFile().length());
+ return fileChunk;
} finally {
if(future != null){
// Close the channel to exit.
http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 66e0f87..a7eaaf8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -21,6 +21,7 @@ package org.apache.tajo.worker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,15 +49,21 @@ import org.apache.tajo.engine.query.QueryUnitRequest;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.util.Timer;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.URI;
import java.text.NumberFormat;
import java.util.*;
@@ -91,6 +98,7 @@ public class Task {
private long finishTime;
private final TableStats inputStats;
+ private List<FileChunk> localChunks;
// TODO - to be refactored
private ShuffleType shuffleType = null;
@@ -190,6 +198,8 @@ public class Task {
context.setOutputPath(outFilePath);
}
+ this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
+
context.setState(TaskAttemptState.TA_PENDING);
LOG.info("==================================");
LOG.info("* Subquery " + request.getId() + " is initialized");
@@ -586,6 +596,17 @@ public class Task {
listTablets.add(tablet);
}
+ // Special treatment for locally pseudo fetched chunks
+ synchronized (localChunks) {
+ for (FileChunk chunk : localChunks) {
+ if (name.equals(chunk.getEbId())) {
+ tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
+ listTablets.add(tablet);
+ LOG.info("One local chunk is added to listTablets");
+ }
+ }
+ }
+
FileFragment[] tablets = new FileFragment[listTablets.size()];
listTablets.toArray(tablets);
@@ -620,8 +641,13 @@ public class Task {
LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
}
try {
- File fetched = fetcher.get();
- if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null) {
+ FileChunk fetched = fetcher.get();
+ if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
+ && fetched.getFile() != null) {
+ if (fetched.fromRemote() == false) {
+ localChunks.add(fetched);
+ LOG.info("Add a new FileChunk to local chunk list");
+ }
break;
}
} catch (Throwable e) {
@@ -677,19 +703,55 @@ public class Task {
Timer timer = executionBlockContext.getRPCTimer();
Path inputDir = executionBlockContext.getLocalDirAllocator().
getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
- File storeDir;
int i = 0;
- File storeFile;
+ File storeDir;
+ File defaultStoreFile;
+ FileChunk storeChunk = null;
List<Fetcher> runnerList = Lists.newArrayList();
+
for (FetchImpl f : fetches) {
+ storeDir = new File(inputDir.toString(), f.getName());
+ if (!storeDir.exists()) {
+ storeDir.mkdirs();
+ }
+
for (URI uri : f.getURIs()) {
- storeDir = new File(inputDir.toString(), f.getName());
- if (!storeDir.exists()) {
- storeDir.mkdirs();
+ defaultStoreFile = new File(storeDir, "in_" + i);
+ InetAddress address = InetAddress.getByName(uri.getHost());
+
+ if (NetUtils.isLocalAddress(address)) {
+ boolean hasError = false;
+ try {
+ LOG.info("Try to get local file chunk at local host");
+ storeChunk = getLocalStoredFileChunk(uri, systemConf);
+ } catch (Throwable t) {
+ hasError = true;
+ }
+
+ // When a range request is out of range, storeChunk will be NULL. This case is normal state.
+ // So, we should skip and don't need to create storeChunk.
+ if (storeChunk == null && !hasError) {
+ continue;
+ }
+
+ if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
+ && hasError == false) {
+ storeChunk.setFromRemote(false);
+ } else {
+ storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+ storeChunk.setFromRemote(true);
+ }
+ } else {
+ storeChunk = new FileChunk(defaultStoreFile, 0, -1);
+ storeChunk.setFromRemote(true);
}
- storeFile = new File(storeDir, "in_" + i);
- Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory, timer);
+
+ // If we decide that intermediate data should be really fetched from a remote host, storeChunk
+ // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
+ storeChunk.setEbId(f.getName());
+ Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk, channelFactory, timer);
+ LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
runnerList.add(fetcher);
i++;
}
@@ -701,6 +763,108 @@ public class Task {
}
}
+ private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
+ // Parse the URI
+ LOG.info("getLocalStoredFileChunk starts");
+ final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).getParameters();
+ final List<String> types = params.get("type");
+ final List<String> qids = params.get("qid");
+ final List<String> taskIdList = params.get("ta");
+ final List<String> subQueryIds = params.get("sid");
+ final List<String> partIds = params.get("p");
+ final List<String> offsetList = params.get("offset");
+ final List<String> lengthList = params.get("length");
+
+ if (types == null || subQueryIds == null || qids == null || partIds == null) {
+ LOG.error("Invalid URI - Required queryId, type, subquery Id, and part id");
+ return null;
+ }
+
+ if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
+ LOG.error("Invalid URI - Required qids, type, taskIds, subquery Id, and part id");
+ return null;
+ }
+
+ String queryId = qids.get(0);
+ String shuffleType = types.get(0);
+ String sid = subQueryIds.get(0);
+ String partId = partIds.get(0);
+
+ if (shuffleType.equals("r") && taskIdList == null) {
+ LOG.error("Invalid URI - For range shuffle, taskId is required");
+ return null;
+ }
+ List<String> taskIds = splitMaps(taskIdList);
+
+ FileChunk chunk = null;
+ long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
+ long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
+
+ LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+ + ", taskIds=" + taskIdList);
+
+ // The working directory of Tajo worker for each query, including subquery
+ String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
+
+ // If the subquery requires a range shuffle
+ if (shuffleType.equals("r")) {
+ String ta = taskIds.get(0);
+ if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
+ LOG.warn("Range shuffle - file not exist");
+ return null;
+ }
+ Path path = executionBlockContext.getLocalFS().makeQualified(
+ executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
+ String startKey = params.get("start").get(0);
+ String endKey = params.get("end").get(0);
+ boolean last = params.get("final") != null;
+
+ try {
+ chunk = TajoPullServerService.getFileCunks(path, startKey, endKey, last);
+ } catch (Throwable t) {
+ LOG.error("getFileChunks() throws exception");
+ return null;
+ }
+
+ // If the subquery requires a hash shuffle or a scattered hash shuffle
+ } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
+ int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
+ String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
+ if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
+ LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
+ return null;
+ }
+ Path path = executionBlockContext.getLocalFS().makeQualified(
+ executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
+ File file = new File(path.toUri());
+ long startPos = (offset >= 0 && length >= 0) ? offset : 0;
+ long readLen = (offset >= 0 && length >= 0) ? length : file.length();
+
+ if (startPos >= file.length()) {
+ LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
+ return null;
+ }
+ chunk = new FileChunk(file, startPos, readLen);
+
+ } else {
+ LOG.error("Unknown shuffle type");
+ return null;
+ }
+
+ return chunk;
+ }
+
+ private List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<String>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
Path workDir =
StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()),
http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index b15d523..551610b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -25,6 +25,7 @@ import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.util.CommonTestingUtil;
@@ -98,8 +99,12 @@ public class TestFetcher {
stream.close();
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer);
- assertNotNull(fetcher.get());
+ FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+ storeChunk.setFromRemote(true);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+ FileChunk chunk = fetcher.get();
+ assertNotNull(chunk);
+ assertNotNull(chunk.getFile());
FileSystem fs = FileSystem.getLocal(new TajoConf());
FileStatus inStatus = fs.getFileStatus(inputPath);
@@ -140,7 +145,9 @@ public class TestFetcher {
stream.close();
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer);
+ FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+ storeChunk.setFromRemote(true);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
fetcher.get();
@@ -168,7 +175,9 @@ public class TestFetcher {
stream.close();
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer);
+ FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+ storeChunk.setFromRemote(true);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
fetcher.get();
@@ -200,7 +209,9 @@ public class TestFetcher {
stream.close();
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer);
+ FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+ storeChunk.setFromRemote(true);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
fetcher.get();
@@ -218,7 +229,9 @@ public class TestFetcher {
String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory, timer);
+ FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
+ storeChunk.setFromRemote(true);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
pullServerService.stop();
http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 1f57675..edcf686 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -19,6 +19,7 @@
package org.apache.tajo.storage;
import com.google.protobuf.Message;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -32,6 +33,7 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatumFactory;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.BitArray;
import java.io.File;
@@ -47,36 +49,31 @@ public class RawFile {
public static class RawFileScanner extends FileScanner implements SeekableScanner {
private FileChannel channel;
private DataType[] columnTypes;
- private Path path;
private ByteBuffer buffer;
+ private int bufferSize;
private Tuple tuple;
- private int headerSize = 0;
+ private int headerSize = 0; // Header size of a tuple
private BitArray nullFlags;
private static final int RECORD_SIZE = 4;
private boolean eof = false;
- private long fileSize;
+ private long fileLimit; // If this.fragment represents a complete file, this value is equal to the file's size
+ private long numBytesRead;
private FileInputStream fis;
private long recordCount;
- public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
- super(conf, schema, meta, null);
- this.path = path;
- }
-
- @SuppressWarnings("unused")
public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
- this(conf, schema, meta, fragment.getPath());
+ super(conf, schema, meta, fragment);
}
public void init() throws IOException {
File file;
try {
- if (path.toUri().getScheme() != null) {
- file = new File(path.toUri());
+ if (fragment.getPath().toUri().getScheme() != null) {
+ file = new File(fragment.getPath().toUri());
} else {
- file = new File(path.toString());
+ file = new File(fragment.getPath().toString());
}
} catch (IllegalArgumentException iae) {
throw new IOException(iae);
@@ -84,16 +81,22 @@ public class RawFile {
fis = new FileInputStream(file);
channel = fis.getChannel();
- fileSize = channel.size();
+ fileLimit = fragment.getStartKey() + fragment.getEndKey(); // fileLimit is less than or equal to fileSize
if (tableStats != null) {
- tableStats.setNumBytes(fileSize);
+ tableStats.setNumBytes(fragment.getEndKey());
}
if (LOG.isDebugEnabled()) {
- LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size());
+ LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", total file size :" + channel.size()
+ + ", fragment size :" + fragment.getEndKey() + ", fileLimit: " + fileLimit);
}
- buffer = ByteBuffer.allocateDirect(64 * 1024);
+ if (fragment.getEndKey() < 64 * StorageUnit.KB) {
+ bufferSize = fragment.getEndKey().intValue();
+ } else {
+ bufferSize = 64 * StorageUnit.KB;
+ }
+ buffer = ByteBuffer.allocateDirect(bufferSize);
columnTypes = new DataType[schema.size()];
for (int i = 0; i < schema.size(); i++) {
@@ -101,14 +104,14 @@ public class RawFile {
}
tuple = new VTuple(columnTypes.length);
+ nullFlags = new BitArray(schema.size());
+ headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize
// initial read
- channel.read(buffer);
+ channel.position(fragment.getStartKey());
+ numBytesRead = channel.read(buffer);
buffer.flip();
- nullFlags = new BitArray(schema.size());
- headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
-
super.init();
}
@@ -125,19 +128,31 @@ public class RawFile {
} else {
buffer.clear();
channel.position(offset);
- channel.read(buffer);
+ int bytesRead = channel.read(buffer);
+ numBytesRead = bytesRead;
buffer.flip();
eof = false;
}
}
private boolean fillBuffer() throws IOException {
+ if (numBytesRead >= fragment.getEndKey()) {
+ eof = true;
+ return false;
+ }
+ int currentDataSize = buffer.remaining();
buffer.compact();
- if (channel.read(buffer) == -1) {
+ int bytesRead = channel.read(buffer);
+ if (bytesRead == -1) {
eof = true;
return false;
} else {
buffer.flip();
+ long realRemaining = fragment.getEndKey() - numBytesRead;
+ numBytesRead += bytesRead;
+ if (realRemaining < bufferSize) {
+ buffer.limit(currentDataSize + (int) realRemaining);
+ }
return true;
}
}
@@ -356,7 +371,7 @@ public class RawFile {
}
}
- if(!buffer.hasRemaining() && channel.position() == fileSize){
+ if(!buffer.hasRemaining() && channel.position() == fileLimit){
eof = true;
}
return new VTuple(tuple);
@@ -368,7 +383,7 @@ public class RawFile {
buffer.clear();
// reload initial buffer
channel.position(0);
- channel.read(buffer);
+ numBytesRead = channel.read(buffer);
buffer.flip();
eof = false;
}
@@ -376,7 +391,7 @@ public class RawFile {
@Override
public void close() throws IOException {
if (tableStats != null) {
- tableStats.setReadBytes(fileSize);
+ tableStats.setReadBytes(fragment.getEndKey());
tableStats.setNumRows(recordCount);
}
@@ -410,14 +425,14 @@ public class RawFile {
}
if(eof || channel == null) {
- tableStats.setReadBytes(fileSize);
+ tableStats.setReadBytes(fragment.getEndKey());
return 1.0f;
}
if (filePos == 0) {
return 0.0f;
} else {
- return Math.min(1.0f, ((float)filePos / (float)fileSize));
+ return Math.min(1.0f, ((float)filePos / fragment.getEndKey().floatValue()));
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
index 5f9f9e8..1c63c8a 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -488,20 +488,20 @@ public class PullServerAuxService extends AuxiliaryService {
ChannelFuture writeFuture;
if (ch.getPipeline().get(SslHandler.class) == null) {
final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
- file.startOffset, file.length(), manageOsCache, readaheadLength,
+ file.startOffset(), file.length(), manageOsCache, readaheadLength,
readaheadPool, file.getFile().getAbsolutePath());
writeFuture = ch.write(partition);
writeFuture.addListener(new FileCloseListener(partition, null, 0, null));
} else {
// HTTPS cannot be done with zero copy.
final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
- file.startOffset, file.length, sslFileBufferSize,
+ file.startOffset(), file.length(), sslFileBufferSize,
manageOsCache, readaheadLength, readaheadPool,
file.getFile().getAbsolutePath());
writeFuture = ch.write(chunk);
}
metrics.shuffleConnections.incr();
- metrics.shuffleOutputBytes.incr(file.length); // optimistic
+ metrics.shuffleOutputBytes.incr(file.length()); // optimistic
return writeFuture;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 3fa67ae..2fb7c29 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -644,14 +644,14 @@ public class TajoPullServerService extends AbstractService {
spill = new RandomAccessFile(file.getFile(), "r");
if (ch.getPipeline().get(SslHandler.class) == null) {
final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
- file.startOffset, file.length(), manageOsCache, readaheadLength,
+ file.startOffset(), file.length(), manageOsCache, readaheadLength,
readaheadPool, file.getFile().getAbsolutePath());
writeFuture = ch.write(filePart);
writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
} else {
// HTTPS cannot be done with zero copy.
final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
- file.startOffset, file.length, sslFileBufferSize,
+ file.startOffset(), file.length(), sslFileBufferSize,
manageOsCache, readaheadLength, readaheadPool,
file.getFile().getAbsolutePath());
writeFuture = ch.write(chunk);
@@ -667,7 +667,7 @@ public class TajoPullServerService extends AbstractService {
return null;
}
metrics.shuffleConnections.incr();
- metrics.shuffleOutputBytes.incr(file.length); // optimistic
+ metrics.shuffleOutputBytes.incr(file.length()); // optimistic
return writeFuture;
}
@@ -698,7 +698,7 @@ public class TajoPullServerService extends AbstractService {
}
}
- public FileChunk getFileCunks(Path outDir,
+ public static FileChunk getFileCunks(Path outDir,
String startKey,
String endKey,
boolean last) throws IOException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/644b7cd9/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
index a8b424e..67cff21 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/FileChunk.java
@@ -23,8 +23,18 @@ import java.io.FileNotFoundException;
public class FileChunk {
private final File file;
- public final long startOffset;
- public final long length;
+ private final long startOffset;
+ private long length;
+
+ /**
+ * TRUE if this.file is created by getting data from a remote host (e.g., by HttpRequest). FALSE otherwise.
+ */
+ private boolean fromRemote;
+
+ /**
+ * ExecutionBlockId
+ */
+ private String ebId;
public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
this.file = file;
@@ -44,8 +54,28 @@ public class FileChunk {
return this.length;
}
+ public void setLength(long newLength) {
+ this.length = newLength;
+ }
+
+ public boolean fromRemote() {
+ return this.fromRemote;
+ }
+
+ public void setFromRemote(boolean newVal) {
+ this.fromRemote = newVal;
+ }
+
+ public String getEbId() {
+ return this.ebId;
+ }
+
+ public void setEbId(String newVal) {
+ this.ebId = newVal;
+ }
+
public String toString() {
- return " (start=" + startOffset() + ", length=" + length + ") "
- + file.getAbsolutePath();
+ return " (start=" + startOffset() + ", length=" + length + ", fromRemote=" + fromRemote + ", ebId=" + ebId + ") "
+ + file.getAbsolutePath();
}
}