You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/04/29 09:30:50 UTC
[2/6] drill git commit: DRILL-2901: Additional fragment state fixes
due to cancellation during various fragment executor state initialization.
DRILL-2901: Additional fragment state fixes due to cancellation during various fragment executor state initialization.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0ae5069c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0ae5069c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0ae5069c
Branch: refs/heads/master
Commit: 0ae5069cefd1e12ac29ae1c1d129b148b39d73f5
Parents: a7a60a2
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Apr 18 21:38:47 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Apr 29 00:28:44 2015 -0700
----------------------------------------------------------------------
.../exec/physical/impl/SingleSenderCreator.java | 12 ++++-----
.../PartitionSenderRootExec.java | 22 +++++++++++++--
.../drill/exec/record/AbstractRecordBatch.java | 6 ++++-
.../exec/record/AbstractSingleRecordBatch.java | 6 ++++-
.../drill/exec/record/VectorContainer.java | 6 ++++-
.../exec/server/TestDrillbitResilience.java | 28 +++++++++++---------
6 files changed, 56 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 29d032d..18ea71d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.physical.impl;
-import io.netty.buffer.ByteBuf;
-
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -29,12 +27,10 @@ import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
-import org.apache.drill.exec.rpc.RpcException;
public class SingleSenderCreator implements RootCreator<SingleSender>{
@@ -101,9 +97,13 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
switch (out) {
case STOP:
case NONE:
+ // if we didn't do anything yet, send an empty schema.
+ final BatchSchema sendSchema = incoming.getSchema() == null ? BatchSchema.newBuilder().build() : incoming
+ .getSchema();
+
FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(),
handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(),
- incoming.getSchema());
+ sendSchema);
stats.startWait();
try {
tunnel.sendRecordBatch(b2);
http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 8965bab..7e3f4b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -51,6 +51,7 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.vector.CopyUtil;
+import com.carrotsearch.hppc.IntArrayList;
import com.google.common.annotations.VisibleForTesting;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
@@ -79,6 +80,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
protected final int numberPartitions;
protected final int actualPartitions;
+ private IntArrayList terminations = new IntArrayList();
+
public enum Metric implements MetricDef {
BATCHES_SENT,
RECORDS_SENT,
@@ -238,7 +241,15 @@ public class PartitionSenderRootExec extends BaseRootExec {
subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
startIndex, endIndex);
}
- partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+
+ synchronized(this){
+ partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+ for (int index = 0; index < terminations.size(); index++) {
+ partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+ }
+ terminations.clear();
+ }
+
}
private List<Partitioner> createClassInstances(int actualPartitions) throws SchemaChangeException {
@@ -296,7 +307,14 @@ public class PartitionSenderRootExec extends BaseRootExec {
public void receivingFragmentFinished(FragmentHandle handle) {
final int id = handle.getMinorFragmentId();
if (remainingReceivers.compareAndSet(id, 0, 1)) {
- partitioner.getOutgoingBatches(id).terminate();
+ synchronized (this) {
+ if (partitioner == null) {
+ terminations.add(id);
+ } else {
+ partitioner.getOutgoingBatches(id).terminate();
+ }
+ }
+
int remaining = remaingReceiverCount.decrementAndGet();
if (remaining == 0) {
done = true;
http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 215f580..c96cb7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -154,7 +154,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
@Override
public BatchSchema getSchema() {
- return container.getSchema();
+ if (container.hasSchema()) {
+ return container.getSchema();
+ } else {
+ return null;
+ }
}
protected void buildSchema() throws SchemaChangeException {
http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index f895f47..3cfe177 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -123,7 +123,11 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
@Override
public BatchSchema getSchema() {
- return container.getSchema();
+ if (container.hasSchema()) {
+ return container.getSchema();
+ } else {
+ return null;
+ }
}
protected abstract boolean setupNewSchema() throws SchemaChangeException;
http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 5a192a0..e5f4be1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -25,7 +25,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import com.google.common.collect.Sets;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.expr.TypeHelper;
@@ -36,6 +35,7 @@ import org.apache.drill.exec.vector.ValueVector;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccessible {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
@@ -257,6 +257,10 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
return va.getChildWrapper(fieldIds);
}
+ public boolean hasSchema() {
+ return schema != null;
+ }
+
public BatchSchema getSchema() {
Preconditions
.checkNotNull(schema,
http://git-wip-us.apache.org/repos/asf/drill/blob/0ae5069c/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index ee0e841..49c8833 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -17,6 +17,16 @@
*/
package org.apache.drill.exec.server;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.commons.math3.util.Pair;
import org.apache.drill.QueryTestUtil;
import org.apache.drill.SingleRowListener;
@@ -61,16 +71,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
/**
* Test how resilient drillbits are to throwing exceptions during various phases of query
* execution by injecting exceptions at various points. The test cases are mentioned in DRILL-2383.
@@ -88,7 +88,7 @@ public class TestDrillbitResilience {
* counting sys.drillbits.
*/
private static final String TEST_QUERY = "select * from sys.memory";
- private static final long PAUSE_TIME_MILLIS = 1000L;
+ private static final long PAUSE_TIME_MILLIS = 3000L;
private static void startDrillbit(final String name, final RemoteServiceSet remoteServiceSet) {
if (drillbits.containsKey(name)) {
@@ -456,8 +456,10 @@ public class TestDrillbitResilience {
QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
- assertTrue(result.getFirst() == QueryState.CANCELED);
- assertTrue(result.getSecond() == null);
+ assertTrue(String.format("Expected Query Outcome of CANCELED but had Outcome of %s", result.getFirst()),
+ result.getFirst() == QueryState.CANCELED);
+ assertTrue(String.format("Expected no Exception but had Exception %s", result.getSecond()),
+ result.getSecond() == null);
}
@Test // Cancellation TC 1