You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/08/05 10:16:40 UTC

git commit: TAJO-949: PullServer does not release files, when a channel throws an internal exception. (jinho)

Repository: tajo
Updated Branches:
  refs/heads/master 0f3412a74 -> 242d6ad65


TAJO-949: PullServer does not release files, when a channel throws an internal exception. (jinho)

Closes #76


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/242d6ad6
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/242d6ad6
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/242d6ad6

Branch: refs/heads/master
Commit: 242d6ad656685b82f2c552aa09abe862107ae366
Parents: 0f3412a
Author: jinossy <ji...@gmail.com>
Authored: Tue Aug 5 17:14:47 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Tue Aug 5 17:14:47 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../java/org/apache/tajo/conf/TajoConf.java     |  6 +--
 .../apache/tajo/engine/function/math/Round.java |  2 +-
 .../querymaster/QueryMasterManagerService.java  |  5 +-
 .../java/org/apache/tajo/worker/Fetcher.java    | 31 +++++++----
 .../main/java/org/apache/tajo/worker/Task.java  |  2 +-
 .../org/apache/tajo/worker/TestFetcher.java     | 29 ++++++++++-
 .../tajo/pullserver/TajoPullServerService.java  | 54 ++++++++++----------
 8 files changed, 87 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index be71bf4..e1d6d03 100644
--- a/CHANGES
+++ b/CHANGES
@@ -105,6 +105,9 @@ Release 0.9.0 - unreleased
     (Hyoungjun Kim via hyunsik)
 
   BUG FIXES
+
+    TAJO-949: PullServer does not release files, when a channel throws 
+    an internal exception. (jinho)
     
     TAJO-975: alias name which is the same to existing column name may cause
     NPE during PPD. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index d5e8bc4..b75530b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -217,9 +217,9 @@ public class TajoConf extends Configuration {
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false),
     SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"),
     SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", 2),
-    SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size",  8192 * 8),
-    SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 5),
-    SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 5),
+    SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size",  8192),
+    SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 120),
+    SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 20),
 
     //////////////////////////////////
     // Storage Configuration

http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
index e457791..cdcb70a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
@@ -42,7 +42,7 @@ import org.apache.tajo.storage.Tuple;
     paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT4}),
         @ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT8}),
         @ParamTypes(paramTypes = {TajoDataTypes.Type.INT4}),
-        @ParamTypes(paramTypes = {TajoDataTypes.Type.INT8}),
+        @ParamTypes(paramTypes = {TajoDataTypes.Type.INT8})
     }
 )
 public class Round extends GeneralFunction {

http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 826052d..f52d143 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -204,7 +204,10 @@ public class QueryMasterManagerService extends CompositeService
     try {
       QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
           new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
-      queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
+      // queryMaster terminated by internal error before task has not done
+      if (queryMasterTask != null) {
+        queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
+      }
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 2aa2875..1b95238 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -116,9 +116,9 @@ public class Fetcher {
   public File get() throws IOException {
     this.startTime = System.currentTimeMillis();
     this.state = TajoProtos.FetcherState.FETCH_FETCHING;
-
+    ChannelFuture future = null;
     try {
-      ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
+      future = bootstrap.connect(new InetSocketAddress(host, port));
 
       // Wait until the connection attempt succeeds or fails.
       Channel channel = future.awaitUninterruptibly().getChannel();
@@ -145,10 +145,13 @@ public class Fetcher {
 
       channelFuture.addListener(ChannelFutureListener.CLOSE);
 
-      // Close the channel to exit.
-      future.getChannel().close();
       return file;
     } finally {
+      if(future != null){
+        // Close the channel to exit.
+        future.getChannel().close();
+      }
+
       this.finishTime = System.currentTimeMillis();
       LOG.info("Status: " + getState() + ", URI:" + uri);
       if (timer != null) {
@@ -249,7 +252,6 @@ public class Fetcher {
         }
 
         if(fileLen == length){
-          IOUtils.cleanup(LOG, fc, raf);
           finishTime = System.currentTimeMillis();
           state = TajoProtos.FetcherState.FETCH_FINISHED;
         }
@@ -265,15 +267,26 @@ public class Fetcher {
         LOG.error("Fetch failed :", e.getCause());
       }
 
-      if(ctx.getChannel().isConnected()){
-        ctx.getChannel().close().setFailure(e.getCause());
-      }
-
       // this fetching will be retry
       IOUtils.cleanup(LOG, fc, raf);
+      if(ctx.getChannel().isConnected()){
+        ctx.getChannel().close();
+      }
       finishTime = System.currentTimeMillis();
       state = TajoProtos.FetcherState.FETCH_FAILED;
     }
+
+    @Override
+    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+      super.channelDisconnected(ctx, e);
+
+      if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
+        //channel is closed, but cannot complete fetcher
+        finishTime = System.currentTimeMillis();
+        state = TajoProtos.FetcherState.FETCH_FAILED;
+      }
+      IOUtils.cleanup(LOG, fc, raf);
+    }
   }
 
   class HttpClientPipelineFactory implements

http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 3a4536a..195b35e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -626,7 +626,7 @@ public class Task {
             if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null) {
               break;
             }
-          } catch (IOException e) {
+          } catch (Throwable e) {
             LOG.error("Fetch failed: " + fetcher.getURI(), e);
           }
           retryNum++;

http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 82d662b..c13842b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -36,8 +36,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Random;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.*;
 
 public class TestFetcher {
   private String TEST_DATA = "target/test-data/TestFetcher";
@@ -195,4 +194,30 @@ public class TestFetcher {
     fetcher.get();
     assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
   }
+
+  @Test
+  public void testServerFailure() throws Exception {
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId;
+    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta);
+
+    URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
+    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    pullServerService.stop();
+
+    boolean failure = false;
+    try{
+      fetcher.get();
+    } catch (Throwable e){
+      failure = true;
+    }
+    assertTrue(failure);
+    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 12cd1a3..3b0ee1f 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -57,7 +57,6 @@ import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.*;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
 import org.jboss.netty.handler.codec.http.*;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
@@ -491,27 +490,33 @@ public class TajoPullServerService extends AbstractService {
     private ChannelFuture sendFile(ChannelHandlerContext ctx,
                                    Channel ch,
                                    FileChunk file) throws IOException {
-      RandomAccessFile spill;
+      RandomAccessFile spill = null;
+      ChannelFuture writeFuture;
       try {
         spill = new RandomAccessFile(file.getFile(), "r");
+        if (ch.getPipeline().get(SslHandler.class) == null) {
+          final FadvisedFileRegionWrapper filePart = new FadvisedFileRegionWrapper(spill,
+              file.startOffset, file.length(), manageOsCache, readaheadLength,
+              readaheadPool, file.getFile().getAbsolutePath());
+          writeFuture = ch.write(filePart);
+          writeFuture.addListener(new FileCloseListener(filePart));
+        } else {
+          // HTTPS cannot be done with zero copy.
+          final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+              file.startOffset, file.length, sslFileBufferSize,
+              manageOsCache, readaheadLength, readaheadPool,
+              file.getFile().getAbsolutePath());
+          writeFuture = ch.write(chunk);
+        }
       } catch (FileNotFoundException e) {
         LOG.info(file.getFile() + " not found");
         return null;
-      }
-      ChannelFuture writeFuture;
-      if (ch.getPipeline().get(SslHandler.class) == null) {
-        final FadvisedFileRegionWrapper filePart = new FadvisedFileRegionWrapper(spill,
-            file.startOffset, file.length(), manageOsCache, readaheadLength,
-            readaheadPool, file.getFile().getAbsolutePath());
-        writeFuture = ch.write(filePart);
-        writeFuture.addListener(new FileCloseListener(filePart));
-      } else {
-        // HTTPS cannot be done with zero copy.
-        final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
-            file.startOffset, file.length, sslFileBufferSize,
-            manageOsCache, readaheadLength, readaheadPool,
-            file.getFile().getAbsolutePath());
-        writeFuture = ch.write(chunk);
+      } catch (Throwable e) {
+        if (spill != null) {
+          //should close a opening file
+          spill.close();
+        }
+        return null;
       }
       metrics.shuffleConnections.incr();
       metrics.shuffleOutputBytes.incr(file.length); // optimistic
@@ -537,17 +542,10 @@ public class TajoPullServerService extends AbstractService {
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
         throws Exception {
-      Channel ch = e.getChannel();
-      Throwable cause = e.getCause();
-      if (cause instanceof TooLongFrameException) {
-        sendError(ctx, BAD_REQUEST);
-        return;
-      }
-
-      LOG.error("PullServer error: ", cause);
-      if (ch.isConnected()) {
-        LOG.error("PullServer error " + e);
-        sendError(ctx, INTERNAL_SERVER_ERROR);
+      LOG.error(e.getCause().getMessage(), e.getCause());
+      //if channel.close() is not called, never closed files in this request
+      if (ctx.getChannel().isConnected()){
+        ctx.getChannel().close();
       }
     }
   }