You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/23 02:06:49 UTC

[6/8] git commit: DRILL-180: propagate exceptions back to client. fail all whenever any query without queryId is received. fix RunningFragmentManager and Foreman to include queryId when building results @Ignore ConstantRopTest.testRefInterp() until plan

DRILL-180: propagate exceptions back to client.
fail all whenever any query without queryId is received.
fix RunningFragmentManager and Foreman to include queryId when building results
@Ignore ConstantRopTest.testRefInterp() until plan is updated.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0fc89a31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0fc89a31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0fc89a31

Branch: refs/heads/master
Commit: 0fc89a317251e684c797ad0e5e5c68c3842ab3b3
Parents: 7edd361
Author: Steven Phillips <sp...@maprtech.com>
Authored: Thu Aug 22 16:53:19 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 22 16:53:19 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/rpc/RpcExceptionHandler.java     |  2 +-
 .../drill/exec/rpc/user/QueryResultHandler.java | 10 ++++++++
 .../exec/work/batch/BitComHandlerImpl.java      |  9 +++++++
 .../apache/drill/exec/work/foreman/Foreman.java |  1 +
 .../work/foreman/RunningFragmentManager.java    |  5 +++-
 .../store/parquet/ParquetRecordReaderTest.java  |  5 ++--
 .../drill/exec/ref/rops/ConstantROPTest.java    | 26 +++++++++-----------
 .../exec/ref/src/test/resources/constant2.json  |  2 +-
 8 files changed, 40 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
index a0aed94..0123cad 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -30,7 +30,7 @@ public class RpcExceptionHandler implements ChannelHandler{
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
     if(!ctx.channel().isOpen()) return;
-    logger.info("Exception in pipeline.  Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause);
+    logger.error("Exception in pipeline.  Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause);
     ctx.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index b2283a2..50f8c5a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -63,6 +63,9 @@ public class QueryResultHandler {
       l = resultsListener.putIfAbsent(result.getQueryId(), bl);
       // if we had a succesful insert, use that reference.  Otherwise, just throw away the new bufering listener.
       if (l == null) l = bl;
+      if (result.getQueryId().toString().equals("")) {
+        failAll();
+      }
     }
       
     if(failed){
@@ -80,6 +83,13 @@ public class QueryResultHandler {
       resultsListener.remove(result.getQueryId(), l);
     }
 
+
+  }
+
+  private void failAll() {
+    for (UserResultsListener l : resultsListener.values()) {
+      l.submissionFailed(new RpcException("Received result without QueryId"));
+    }
   }
 
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
index 5807c87..8cba493 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -124,6 +124,15 @@ public class BitComHandlerImpl implements BitComHandler {
       listener.fail(fragment.getHandle(), "Failure while parsing fragment execution plan.", e);
     }catch(ExecutionSetupException e){
       listener.fail(fragment.getHandle(), "Failure while setting up execution plan.", e);
+    } catch (Exception e) {
+      listener.fail(fragment.getHandle(), "Failure due to uncaught exception", e);
+    } catch (OutOfMemoryError t) {
+      if(t.getMessage().startsWith("Direct buffer")){
+        listener.fail(fragment.getHandle(), "Failure due to error", t);  
+      }else{
+        throw t;
+      }
+      
     }
     
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index c1fd9e5..bd64938 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -106,6 +106,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
         .addError(error) //
         .setIsLastChunk(true) //
         .setQueryState(QueryState.FAILED) //
+        .setQueryId(queryId) //
         .build();
     cleanupAndSendResult(result);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index da2f7c1..9d9aca6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -147,7 +147,10 @@ class RunningFragmentManager implements FragmentStatusListener{
     updateStatus(status);
     int remaining = remainingFragmentCount.decrementAndGet();
     if(remaining == 0){
-      QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.COMPLETED).build();
+      QueryResult result = QueryResult.newBuilder() //
+              .setQueryState(QueryState.COMPLETED) //
+              .setQueryId(queryId) //
+              .build();
       foreman.cleanupAndSendResult(result);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 7a99c3f..fc5bc81 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -103,17 +103,16 @@ public class ParquetRecordReaderTest {
   }
 
   @Test
-  @Ignore
   public void testLocalDistributed() throws Exception {
     String planName = "/parquet/parquet_scan_union_screen_physical.json";
-    testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20, 300000);
+    testParquetFullEngineLocalTextDistributed(planName, fileName, 1, numberRowGroups, recordsPerRowGroup);
   }
 
   @Test
   @Ignore
   public void testRemoteDistributed() throws Exception {
     String planName = "/parquet/parquet_scan_union_screen_physical.json";
-    testParquetFullEngineRemote(planName, fileName, 1, 10, 30000);
+    testParquetFullEngineRemote(planName, fileName, 1, numberRowGroups, recordsPerRowGroup);
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/ConstantROPTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/ConstantROPTest.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/ConstantROPTest.java
index 9aea930..353e66d 100644
--- a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/ConstantROPTest.java
+++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/ConstantROPTest.java
@@ -1,24 +1,25 @@
 package org.apache.drill.exec.ref.rops;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
+import java.util.Collection;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.data.Constant;
 import org.apache.drill.common.util.FileUtils;
-import org.apache.drill.exec.ref.*;
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.ReferenceInterpreter;
+import org.apache.drill.exec.ref.RunOutcome;
 import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory;
 import org.apache.drill.exec.ref.rse.RSERegistry;
 import org.apache.drill.exec.ref.values.ScalarValues;
+import org.junit.Ignore;
 import org.junit.Test;
 
-
-import java.io.File;
-import java.util.Collection;
-import java.util.Iterator;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 /**
  * Created with IntelliJ IDEA.
@@ -64,9 +65,8 @@ public class ConstantROPTest {
     // not sure if we want to keep this as a test and check the results. Now that the internals of the ConstantROP work
     // it might now be worth running the reference intepreter with every build
     @Test
-    public void testRefInterp(){
-
-        try{
+    @Ignore // this plan needs to be updated.
+    public void testRefInterp() throws Exception{
             DrillConfig config = DrillConfig.create();
             final String jsonFile = "/constant2.json";
             LogicalPlan plan = LogicalPlan.parse(config, FileUtils.getResourceAsString(jsonFile));
@@ -89,8 +89,6 @@ public class ConstantROPTest {
                     outcome.exception.printStackTrace();
                 }
             }
-        } catch (Exception e) {
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/ref/src/test/resources/constant2.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/constant2.json b/sandbox/prototype/exec/ref/src/test/resources/constant2.json
index 31fed5d..bad1aa3 100644
--- a/sandbox/prototype/exec/ref/src/test/resources/constant2.json
+++ b/sandbox/prototype/exec/ref/src/test/resources/constant2.json
@@ -1,6 +1,6 @@
 {
   head:{
-    type:"apache_drill_logical_plan",
+    type:"APACHE_DRILL_LOGICAL",
     version:"1",
     generator:{
       type:"manual",