You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/08/11 10:11:36 UTC
[07/17] git commit: TAJO-949: PullServer does not release files,
when a channel throws an internal exception. (jinho)
TAJO-949: PullServer does not release files, when a channel throws an internal exception. (jinho)
Closes #76
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/242d6ad6
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/242d6ad6
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/242d6ad6
Branch: refs/heads/index_support
Commit: 242d6ad656685b82f2c552aa09abe862107ae366
Parents: 0f3412a
Author: jinossy <ji...@gmail.com>
Authored: Tue Aug 5 17:14:47 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Tue Aug 5 17:14:47 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 ++
.../java/org/apache/tajo/conf/TajoConf.java | 6 +--
.../apache/tajo/engine/function/math/Round.java | 2 +-
.../querymaster/QueryMasterManagerService.java | 5 +-
.../java/org/apache/tajo/worker/Fetcher.java | 31 +++++++----
.../main/java/org/apache/tajo/worker/Task.java | 2 +-
.../org/apache/tajo/worker/TestFetcher.java | 29 ++++++++++-
.../tajo/pullserver/TajoPullServerService.java | 54 ++++++++++----------
8 files changed, 87 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index be71bf4..e1d6d03 100644
--- a/CHANGES
+++ b/CHANGES
@@ -105,6 +105,9 @@ Release 0.9.0 - unreleased
(Hyoungjun Kim via hyunsik)
BUG FIXES
+
+ TAJO-949: PullServer does not release files, when a channel throws
+ an internal exception. (jinho)
TAJO-975: alias name which is the same to existing column name may cause
NPE during PPD. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index d5e8bc4..b75530b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -217,9 +217,9 @@ public class TajoConf extends Configuration {
SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false),
SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"),
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", 2),
- SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192 * 8),
- SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 5),
- SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 5),
+ SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192),
+ SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 120),
+ SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 20),
//////////////////////////////////
// Storage Configuration
http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
index e457791..cdcb70a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
@@ -42,7 +42,7 @@ import org.apache.tajo.storage.Tuple;
paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT4}),
@ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT8}),
@ParamTypes(paramTypes = {TajoDataTypes.Type.INT4}),
- @ParamTypes(paramTypes = {TajoDataTypes.Type.INT8}),
+ @ParamTypes(paramTypes = {TajoDataTypes.Type.INT8})
}
)
public class Round extends GeneralFunction {
http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 826052d..f52d143 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -204,7 +204,10 @@ public class QueryMasterManagerService extends CompositeService
try {
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
- queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
+ // queryMaster terminated by internal error before task has not done
+ if (queryMasterTask != null) {
+ queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
+ }
done.run(TajoWorker.TRUE_PROTO);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 2aa2875..1b95238 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -116,9 +116,9 @@ public class Fetcher {
public File get() throws IOException {
this.startTime = System.currentTimeMillis();
this.state = TajoProtos.FetcherState.FETCH_FETCHING;
-
+ ChannelFuture future = null;
try {
- ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+ future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
@@ -145,10 +145,13 @@ public class Fetcher {
channelFuture.addListener(ChannelFutureListener.CLOSE);
- // Close the channel to exit.
- future.getChannel().close();
return file;
} finally {
+ if(future != null){
+ // Close the channel to exit.
+ future.getChannel().close();
+ }
+
this.finishTime = System.currentTimeMillis();
LOG.info("Status: " + getState() + ", URI:" + uri);
if (timer != null) {
@@ -249,7 +252,6 @@ public class Fetcher {
}
if(fileLen == length){
- IOUtils.cleanup(LOG, fc, raf);
finishTime = System.currentTimeMillis();
state = TajoProtos.FetcherState.FETCH_FINISHED;
}
@@ -265,15 +267,26 @@ public class Fetcher {
LOG.error("Fetch failed :", e.getCause());
}
- if(ctx.getChannel().isConnected()){
- ctx.getChannel().close().setFailure(e.getCause());
- }
-
// this fetching will be retry
IOUtils.cleanup(LOG, fc, raf);
+ if(ctx.getChannel().isConnected()){
+ ctx.getChannel().close();
+ }
finishTime = System.currentTimeMillis();
state = TajoProtos.FetcherState.FETCH_FAILED;
}
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ super.channelDisconnected(ctx, e);
+
+ if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
+ //channel is closed, but cannot complete fetcher
+ finishTime = System.currentTimeMillis();
+ state = TajoProtos.FetcherState.FETCH_FAILED;
+ }
+ IOUtils.cleanup(LOG, fc, raf);
+ }
}
class HttpClientPipelineFactory implements
http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 3a4536a..195b35e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -626,7 +626,7 @@ public class Task {
if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null) {
break;
}
- } catch (IOException e) {
+ } catch (Throwable e) {
LOG.error("Fetch failed: " + fetcher.getURI(), e);
}
retryNum++;
http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 82d662b..c13842b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -36,8 +36,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.Random;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.*;
public class TestFetcher {
private String TEST_DATA = "target/test-data/TestFetcher";
@@ -195,4 +194,30 @@ public class TestFetcher {
fetcher.get();
assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
}
+
+ @Test
+ public void testServerFailure() throws Exception {
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
+ String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
+
+ URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+ final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ pullServerService.stop();
+
+ boolean failure = false;
+ try{
+ fetcher.get();
+ } catch (Throwable e){
+ failure = true;
+ }
+ assertTrue(failure);
+ assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 12cd1a3..3b0ee1f 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -57,7 +57,6 @@ import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.http.*;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
@@ -491,27 +490,33 @@ public class TajoPullServerService extends AbstractService {
private ChannelFuture sendFile(ChannelHandlerContext ctx,
Channel ch,
FileChunk file) throws IOException {
- RandomAccessFile spill;
+ RandomAccessFile spill = null;
+ ChannelFuture writeFuture;
try {
spill = new RandomAccessFile(file.getFile(), "r");
+ if (ch.getPipeline().get(SslHandler.class) == null) {
+ final FadvisedFileRegionWrapper filePart = new FadvisedFileRegionWrapper(spill,
+ file.startOffset, file.length(), manageOsCache, readaheadLength,
+ readaheadPool, file.getFile().getAbsolutePath());
+ writeFuture = ch.write(filePart);
+ writeFuture.addListener(new FileCloseListener(filePart));
+ } else {
+ // HTTPS cannot be done with zero copy.
+ final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+ file.startOffset, file.length, sslFileBufferSize,
+ manageOsCache, readaheadLength, readaheadPool,
+ file.getFile().getAbsolutePath());
+ writeFuture = ch.write(chunk);
+ }
} catch (FileNotFoundException e) {
LOG.info(file.getFile() + " not found");
return null;
- }
- ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) == null) {
- final FadvisedFileRegionWrapper filePart = new FadvisedFileRegionWrapper(spill,
- file.startOffset, file.length(), manageOsCache, readaheadLength,
- readaheadPool, file.getFile().getAbsolutePath());
- writeFuture = ch.write(filePart);
- writeFuture.addListener(new FileCloseListener(filePart));
- } else {
- // HTTPS cannot be done with zero copy.
- final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
- file.startOffset, file.length, sslFileBufferSize,
- manageOsCache, readaheadLength, readaheadPool,
- file.getFile().getAbsolutePath());
- writeFuture = ch.write(chunk);
+ } catch (Throwable e) {
+ if (spill != null) {
+ //should close a opening file
+ spill.close();
+ }
+ return null;
}
metrics.shuffleConnections.incr();
metrics.shuffleOutputBytes.incr(file.length); // optimistic
@@ -537,17 +542,10 @@ public class TajoPullServerService extends AbstractService {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
- if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
- return;
- }
-
- LOG.error("PullServer error: ", cause);
- if (ch.isConnected()) {
- LOG.error("PullServer error " + e);
- sendError(ctx, INTERNAL_SERVER_ERROR);
+ LOG.error(e.getCause().getMessage(), e.getCause());
+ //if channel.close() is not called, never closed files in this request
+ if (ctx.getChannel().isConnected()){
+ ctx.getChannel().close();
}
}
}