You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/04/29 09:30:50 UTC

[2/6] drill git commit: DRILL-2901: Additional fragment state fixes due to cancellation during various fragment executor state initialization.

DRILL-2901: Additional fragment state fixes due to cancellation during various fragment executor state initialization.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0ae5069c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0ae5069c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0ae5069c

Branch: refs/heads/master
Commit: 0ae5069cefd1e12ac29ae1c1d129b148b39d73f5
Parents: a7a60a2
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Apr 18 21:38:47 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Apr 29 00:28:44 2015 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/SingleSenderCreator.java | 12 ++++-----
 .../PartitionSenderRootExec.java                | 22 +++++++++++++--
 .../drill/exec/record/AbstractRecordBatch.java  |  6 ++++-
 .../exec/record/AbstractSingleRecordBatch.java  |  6 ++++-
 .../drill/exec/record/VectorContainer.java      |  6 ++++-
 .../exec/server/TestDrillbitResilience.java     | 28 +++++++++++---------
 6 files changed, 56 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 29d032d..18ea71d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl;
 
-import io.netty.buffer.ByteBuf;
-
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -29,12 +27,10 @@ import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
-import org.apache.drill.exec.rpc.RpcException;
 
 public class SingleSenderCreator implements RootCreator<SingleSender>{
 
@@ -101,9 +97,13 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       switch (out) {
       case STOP:
       case NONE:
+        // if we didn't do anything yet, send an empty schema.
+        final BatchSchema sendSchema = incoming.getSchema() == null ? BatchSchema.newBuilder().build() : incoming
+            .getSchema();
+
         FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(),
             handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(),
-            incoming.getSchema());
+            sendSchema);
         stats.startWait();
         try {
           tunnel.sendRecordBatch(b2);

http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 8965bab..7e3f4b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -51,6 +51,7 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.vector.CopyUtil;
 
+import com.carrotsearch.hppc.IntArrayList;
 import com.google.common.annotations.VisibleForTesting;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
@@ -79,6 +80,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
   protected final int numberPartitions;
   protected final int actualPartitions;
 
+  private IntArrayList terminations = new IntArrayList();
+
   public enum Metric implements MetricDef {
     BATCHES_SENT,
     RECORDS_SENT,
@@ -238,7 +241,15 @@ public class PartitionSenderRootExec extends BaseRootExec {
       subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
         startIndex, endIndex);
     }
-    partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+
+    synchronized(this){
+      partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+      for (int index = 0; index < terminations.size(); index++) {
+        partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+      }
+      terminations.clear();
+    }
+
   }
 
   private List<Partitioner> createClassInstances(int actualPartitions) throws SchemaChangeException {
@@ -296,7 +307,14 @@ public class PartitionSenderRootExec extends BaseRootExec {
   public void receivingFragmentFinished(FragmentHandle handle) {
     final int id = handle.getMinorFragmentId();
     if (remainingReceivers.compareAndSet(id, 0, 1)) {
-      partitioner.getOutgoingBatches(id).terminate();
+      synchronized (this) {
+        if (partitioner == null) {
+          terminations.add(id);
+        } else {
+          partitioner.getOutgoingBatches(id).terminate();
+        }
+      }
+
       int remaining = remaingReceiverCount.decrementAndGet();
       if (remaining == 0) {
         done = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 215f580..c96cb7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -154,7 +154,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
 
   @Override
   public BatchSchema getSchema() {
-    return container.getSchema();
+    if (container.hasSchema()) {
+      return container.getSchema();
+    } else {
+      return null;
+    }
   }
 
   protected void buildSchema() throws SchemaChangeException {

http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index f895f47..3cfe177 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -123,7 +123,11 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
 
   @Override
   public BatchSchema getSchema() {
-    return container.getSchema();
+    if (container.hasSchema()) {
+      return container.getSchema();
+    } else {
+      return null;
+    }
   }
 
   protected abstract boolean setupNewSchema() throws SchemaChangeException;

http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 5a192a0..e5f4be1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -25,7 +25,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import com.google.common.collect.Sets;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.expr.TypeHelper;
@@ -36,6 +35,7 @@ import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccessible {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
@@ -257,6 +257,10 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
     return va.getChildWrapper(fieldIds);
   }
 
+  public boolean hasSchema() {
+    return schema != null;
+  }
+
   public BatchSchema getSchema() {
     Preconditions
         .checkNotNull(schema,

http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index ee0e841..49c8833 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -17,6 +17,16 @@
  */
 package org.apache.drill.exec.server;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.commons.math3.util.Pair;
 import org.apache.drill.QueryTestUtil;
 import org.apache.drill.SingleRowListener;
@@ -61,16 +71,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 /**
  * Test how resilient drillbits are to throwing exceptions during various phases of query
  * execution by injecting exceptions at various points. The test cases are mentioned in DRILL-2383.
@@ -88,7 +88,7 @@ public class TestDrillbitResilience {
    * counting sys.drillbits.
    */
   private static final String TEST_QUERY = "select * from sys.memory";
-  private static final long PAUSE_TIME_MILLIS = 1000L;
+  private static final long PAUSE_TIME_MILLIS = 3000L;
 
   private static void startDrillbit(final String name, final RemoteServiceSet remoteServiceSet) {
     if (drillbits.containsKey(name)) {
@@ -456,8 +456,10 @@ public class TestDrillbitResilience {
 
     QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    assertTrue(result.getFirst() == QueryState.CANCELED);
-    assertTrue(result.getSecond() == null);
+    assertTrue(String.format("Expected Query Outcome of CANCELED but had Outcome of %s", result.getFirst()),
+        result.getFirst() == QueryState.CANCELED);
+    assertTrue(String.format("Expected no Exception but had Exception %s", result.getSecond()),
+        result.getSecond() == null);
   }
 
   @Test // Cancellation TC 1