You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/03/09 09:23:52 UTC
[5/8] drill git commit: DRILL-133: LocalExchange planning and exec.
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index d99f40d..e908538 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -50,6 +50,7 @@ import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor;
import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier;
import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer;
+import org.apache.drill.exec.planner.physical.visitor.InsertLocalExchangeVisitor;
import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor;
import org.apache.drill.exec.planner.physical.visitor.MemoryEstimationVisitor;
import org.apache.drill.exec.planner.physical.visitor.RelUniqifier;
@@ -290,13 +291,6 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
/* 4.)
- * Next, we add any required selection vector removers given the supported encodings of each
- * operator. This will ultimately move to a new trait but we're managing here for now to avoid
- * introducing new issues in planning before the next release
- */
- phyRelNode = SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode);
-
- /* 5.)
* Add ProducerConsumer after each scan if the option is set
* Use the configured queueSize
*/
@@ -308,7 +302,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
*/
- /* 6.)
+ /* 5.)
* if the client does not support complex types (Map, Repeated)
* insert a project which which would convert
*/
@@ -317,7 +311,21 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
phyRelNode = ComplexToJsonPrelVisitor.addComplexToJsonPrel(phyRelNode);
}
+
+ /* 6.)
+ * Insert LocalExchange (mux and/or demux) nodes
+ */
+ phyRelNode = InsertLocalExchangeVisitor.insertLocalExchanges(phyRelNode, queryOptions);
+
+
/* 7.)
+ * Next, we add any required selection vector removers given the supported encodings of each
+ * operator. This will ultimately move to a new trait but we're managing here for now to avoid
+ * introducing new issues in planning before the next release
+ */
+ phyRelNode = SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode);
+
+ /* 8.)
* Finally, Make sure that the no rels are repeats.
* This could happen in the case of querying the same table twice as Optiq may canonicalize these.
*/
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 7ad3f77..d821af8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -51,6 +51,8 @@ public class SystemOptionManager implements OptionManager {
PlannerSettings.BROADCAST_THRESHOLD,
PlannerSettings.BROADCAST_FACTOR,
PlannerSettings.JOIN_ROW_COUNT_ESTIMATE_FACTOR,
+ PlannerSettings.MUX_EXCHANGE,
+ PlannerSettings.DEMUX_EXCHANGE,
PlannerSettings.PRODUCER_CONSUMER,
PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE,
PlannerSettings.HASH_SINGLE_KEY,
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 9902443..54cad56 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -167,7 +167,6 @@ public class EasyGroupScan extends AbstractFileGroupScan{
@Override
public List<EndpointAffinity> getOperatorAffinity() {
- assert chunks != null && chunks.size() > 0;
if (endpointAffinities == null) {
logger.debug("chunks: {}", chunks.size());
endpointAffinities = AffinityCreator.getAffinityMap(chunks);
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
index aa1609d..262c6be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java
@@ -69,11 +69,6 @@ public class DirectGroupScan extends AbstractGroupScan{
}
@Override
- public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.emptyList();
- }
-
- @Override
public String getDigest() {
return String.valueOf(reader);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
index 8335ed9..22cc483 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
@@ -106,11 +106,6 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{
}
@Override
- public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.emptyList();
- }
-
- @Override
public String getDigest() {
return this.table.toString() + ", filter=" + filter;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
index 5736df8..dc90a33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -167,11 +167,6 @@ public class MockGroupScanPOP extends AbstractGroupScan {
}
- @Override
- public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.emptyList();
- }
-
@SuppressWarnings("unchecked")
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) {
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index 053f5de..cdd0d18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -100,12 +100,6 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{
}
@Override
- public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.emptyList();
- }
-
-
- @Override
public int getOperatorType() {
return CoreOperatorType.SYSTEM_TABLE_SCAN_VALUE;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java
new file mode 100644
index 0000000..4277580
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+
+import com.google.common.base.Preconditions;
+
+import java.util.Arrays;
+
+/**
+ * Simple Map type data structure for storing entries of (int -> int) mappings where the max key value is below 2^16
+ * to avoid hashing keys and use direct array index reference for retrieving the values. Not thread-safe. Keys and
+ * values are expected to be >=0.
+ */
+public class ArrayWrappedIntIntMap {
+ private static final int MAX_KEY_VALUE = 1 << 16 - 1;
+ private static final int INITIAL_MAX_KEY_VALUE = 1 << 8 - 1;
+ private int[] values;
+
+ public ArrayWrappedIntIntMap() {
+ values = new int[INITIAL_MAX_KEY_VALUE + 1];
+ Arrays.fill(values, Integer.MIN_VALUE);
+ }
+
+ public void put(final int key, final int value) {
+ Preconditions.checkArgument(key >= 0 && key <= MAX_KEY_VALUE,
+ String.format("Index should be in range [0, %d], given [%d].", MAX_KEY_VALUE, key));
+ Preconditions.checkArgument(value >= 0, String.format("Value must be non-negative, given [%d]", value));
+
+ // resize the values array if the index falls beyond the current size of the array
+ if (values.length < key + 1) {
+ // Make the new size the next power of 2 number after the given index number
+ int newValuesLength = Integer.highestOneBit(key) * 2;
+ int[] newValues = Arrays.copyOf(values, newValuesLength);
+ Arrays.fill(newValues, values.length, newValues.length - 1, Integer.MIN_VALUE);
+ values = newValues;
+ }
+
+ values[key] = value;
+ }
+
+ /**
+ * Returns the value pointed by the given index.
+ * If the value is not set through put() it either returns Integer.MIN_VALUE or throws ArrayIndexOutOfBounds
+ * exception. Error checking is not done for faster retrieval.
+ */
+ public int get(int key) {
+ return values[key];
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index c83106c..ed16314 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -26,25 +26,33 @@ import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.base.Receiver;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.record.RawFragmentBatch;
import com.google.common.base.Preconditions;
+import org.apache.drill.exec.util.ArrayWrappedIntIntMap;
public abstract class AbstractDataCollector implements DataCollector{
- private final List<DrillbitEndpoint> incoming;
+ private final List<MinorFragmentEndpoint> incoming;
private final int oppositeMajorFragmentId;
private final AtomicIntegerArray remainders;
private final AtomicInteger remainingRequired;
- protected final RawBatchBuffer[] buffers;
private final AtomicInteger parentAccounter;
- private final AtomicInteger finishedStreams = new AtomicInteger();
- private final FragmentContext context;
- public AbstractDataCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired, FragmentContext context) {
- Preconditions.checkArgument(minInputsRequired > 0);
+ protected final RawBatchBuffer[] buffers;
+ protected final ArrayWrappedIntIntMap fragmentMap;
+
+ /**
+ * @param parentAccounter
+ * @param receiver
+ * @param numBuffers Number of RawBatchBuffer inputs required to store the incoming data
+ * @param bufferCapacity Capacity of each RawBatchBuffer.
+ * @param context
+ */
+ public AbstractDataCollector(AtomicInteger parentAccounter, Receiver receiver,
+ final int numBuffers, final int bufferCapacity, FragmentContext context) {
Preconditions.checkNotNull(receiver);
Preconditions.checkNotNull(parentAccounter);
@@ -52,34 +60,43 @@ public abstract class AbstractDataCollector implements DataCollector{
this.incoming = receiver.getProvidingEndpoints();
this.remainders = new AtomicIntegerArray(incoming.size());
this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
- this.buffers = new RawBatchBuffer[minInputsRequired];
- this.context = context;
+
+ // Create fragmentId to index that is within the range [0, incoming.size()-1]
+ // We use this mapping to find objects belonging to the fragment in buffers and remainders arrays.
+ fragmentMap = new ArrayWrappedIntIntMap();
+ int index = 0;
+ for(MinorFragmentEndpoint endpoint : incoming) {
+ fragmentMap.put(endpoint.getId(), index);
+ index++;
+ }
+
+ buffers = new RawBatchBuffer[numBuffers];
+ remainingRequired = new AtomicInteger(numBuffers);
+
try {
String bufferClassName = context.getConfig().getString(ExecConstants.INCOMING_BUFFER_IMPL);
Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class, int.class);
- for(int i = 0; i < buffers.length; i++) {
- buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, receiver.supportsOutOfOrderExchange() ? incoming.size() : 1);
+
+ for(int i=0; i<numBuffers; i++) {
+ buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, bufferCapacity);
}
} catch (InstantiationException | IllegalAccessException | InvocationTargetException |
NoSuchMethodException | ClassNotFoundException e) {
context.fail(e);
}
- if (receiver.supportsOutOfOrderExchange()) {
- this.remainingRequired = new AtomicInteger(1);
- } else {
- this.remainingRequired = new AtomicInteger(minInputsRequired);
- }
}
+ @Override
public int getOppositeMajorFragmentId() {
return oppositeMajorFragmentId;
}
+ @Override
public RawBatchBuffer[] getBuffers(){
return buffers;
}
-
+ @Override
public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException {
// if we received an out of memory, add an item to all the buffer queues.
@@ -91,7 +108,7 @@ public abstract class AbstractDataCollector implements DataCollector{
// check to see if we have enough fragments reporting to proceed.
boolean decremented = false;
- if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
+ if (remainders.compareAndSet(fragmentMap.get(minorFragmentId), 0, 1)) {
int rem = remainingRequired.decrementAndGet();
if (rem == 0) {
parentAccounter.decrementAndGet();
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index ce14260..8c09f80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -24,11 +24,8 @@ import org.apache.drill.exec.physical.base.Receiver;
public class MergingCollector extends AbstractDataCollector{
- private AtomicInteger streamsRunning;
-
public MergingCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
- super(parentAccounter, receiver, 1, context);
- streamsRunning = new AtomicInteger(receiver.getProvidingEndpoints().size());
+ super(parentAccounter, receiver, 1, receiver.getProvidingEndpoints().size(), context);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index 5190d84..7ce9074 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -25,11 +25,11 @@ import org.apache.drill.exec.physical.base.Receiver;
public class PartitionedCollector extends AbstractDataCollector{
public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
- super(parentAccounter, receiver, receiver.getProvidingEndpoints().size(), context);
+ super(parentAccounter, receiver, receiver.getProvidingEndpoints().size(), 1, context);
}
@Override
protected RawBatchBuffer getBuffer(int minorFragmentId) {
- return buffers[minorFragmentId];
+ return buffers[fragmentMap.get(minorFragmentId)];
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 378e81a..409450f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -47,9 +47,7 @@ import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
-import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
-import org.apache.drill.exec.planner.fragment.StatsCollector;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.BitControl.InitializeFragments;
@@ -370,15 +368,11 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
private QueryWorkUnit getQueryWorkUnit(PhysicalPlan plan) throws ExecutionSetupException {
PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
- MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor();
- Fragment rootFragment = rootOperator.accept(makeFragmentsVisitor, null);
- PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
- SimpleParallelizer parallelizer = new SimpleParallelizer(context);
-
+ Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
+ SimpleParallelizer parallelizer = new SimpleParallelizer(context);
return parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(),
- queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet,
- initiatingClient.getSession());
+ queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, initiatingClient.getSession());
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index cf99577..4d2a0df 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.base.Preconditions;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.ExecConstants;
@@ -45,6 +46,7 @@ import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.util.VectorUtil;
import org.junit.AfterClass;
@@ -60,6 +62,13 @@ import com.google.common.io.Resources;
public class BaseTestQuery extends ExecTest{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
+ /**
+ * Number of Drillbits in test cluster. Default is 1.
+ *
+ * Tests can update the cluster size through {@link #setDrillbitCount(int)}
+ */
+ private static int drillbitCount = 1;
+
private int[] columnWidths = new int[] { 8 };
private static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
@@ -84,12 +93,37 @@ public class BaseTestQuery extends ExecTest{
};
protected static DrillClient client;
- protected static Drillbit bit;
+ protected static Drillbit[] bits;
protected static RemoteServiceSet serviceSet;
protected static DrillConfig config;
protected static QuerySubmitter submitter = new QuerySubmitter();
protected static BufferAllocator allocator;
+ protected static void setDrillbitCount(int newDrillbitCount) {
+ Preconditions.checkArgument(newDrillbitCount > 0, "Number of Drillbits must be at least one");
+ if (drillbitCount != newDrillbitCount) {
+ // TODO: Currently we have to shutdown the existing Drillbit cluster before starting a new one with the given
+ // Drillbit count. Revisit later to avoid stopping the cluster.
+ try {
+ closeClient();
+ drillbitCount = newDrillbitCount;
+ openClient();
+ } catch(Exception e) {
+ throw new RuntimeException("Failure while changing the number of Drillbits in test cluster.", e);
+ }
+ }
+ }
+
+ /**
+ * Useful for tests that require a DrillbitContext to get/add storage plugins, options etc.
+ *
+ * @return DrillbitContext of first Drillbit in the cluster.
+ */
+ protected static DrillbitContext getDrillbitContext() {
+ Preconditions.checkState(bits != null && bits[0] != null, "Drillbits are not setup.");
+ return bits[0].getContext();
+ }
+
static void resetClientAndBit() throws Exception{
closeClient();
openClient();
@@ -104,8 +138,13 @@ public class BaseTestQuery extends ExecTest{
} else {
serviceSet = RemoteServiceSet.getLocalServiceSet();
}
- bit = new Drillbit(config, serviceSet);
- bit.run();
+
+ bits = new Drillbit[drillbitCount];
+ for(int i=0; i<drillbitCount; i++) {
+ bits[i] = new Drillbit(config, serviceSet);
+ bits[i].run();
+ }
+
client = new DrillClient(config, serviceSet.getCoordinator());
client.connect();
List<QueryResultBatch> results = client.runQuery(QueryType.SQL, String.format("alter session set `%s` = 2", ExecConstants.MAX_WIDTH_PER_NODE_KEY));
@@ -118,11 +157,11 @@ public class BaseTestQuery extends ExecTest{
return allocator;
}
- public TestBuilder newTest() {
+ public static TestBuilder newTest() {
return testBuilder();
}
- public TestBuilder testBuilder() {
+ public static TestBuilder testBuilder() {
return new TestBuilder(allocator);
}
@@ -131,9 +170,15 @@ public class BaseTestQuery extends ExecTest{
if (client != null) {
client.close();
}
- if (bit != null) {
- bit.close();
+
+ if (bits != null) {
+ for(Drillbit bit : bits) {
+ if (bit != null) {
+ bit.close();
+ }
+ }
}
+
if(serviceSet != null) {
serviceSet.close();
}
@@ -142,21 +187,21 @@ public class BaseTestQuery extends ExecTest{
}
}
- protected void runSQL(String sql) throws Exception {
+ protected static void runSQL(String sql) throws Exception {
SilentListener listener = new SilentListener();
testWithListener(QueryType.SQL, sql, listener);
listener.waitForCompletion();
}
- protected List<QueryResultBatch> testSqlWithResults(String sql) throws Exception{
+ protected static List<QueryResultBatch> testSqlWithResults(String sql) throws Exception{
return testRunAndReturn(QueryType.SQL, sql);
}
- protected List<QueryResultBatch> testLogicalWithResults(String logical) throws Exception{
+ protected static List<QueryResultBatch> testLogicalWithResults(String logical) throws Exception{
return testRunAndReturn(QueryType.LOGICAL, logical);
}
- protected List<QueryResultBatch> testPhysicalWithResults(String physical) throws Exception{
+ protected static List<QueryResultBatch> testPhysicalWithResults(String physical) throws Exception{
return testRunAndReturn(QueryType.PHYSICAL, physical);
}
@@ -172,7 +217,7 @@ public class BaseTestQuery extends ExecTest{
return resultListener.await();
}
- protected void testWithListener(QueryType type, String query, UserResultsListener resultListener) {
+ protected static void testWithListener(QueryType type, String query, UserResultsListener resultListener) {
query = normalizeQuery(query);
client.runQuery(type, query, resultListener);
}
@@ -216,15 +261,15 @@ public class BaseTestQuery extends ExecTest{
return query;
}
- protected int testLogical(String query) throws Exception{
+ protected static int testLogical(String query) throws Exception{
return testRunAndPrint(QueryType.LOGICAL, query);
}
- protected int testPhysical(String query) throws Exception{
+ protected static int testPhysical(String query) throws Exception{
return testRunAndPrint(QueryType.PHYSICAL, query);
}
- protected int testSql(String query) throws Exception{
+ protected static int testSql(String query) throws Exception{
return testRunAndPrint(QueryType.SQL, query);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
index c95d2b2..36091af 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
@@ -288,12 +288,11 @@ public class PlanTestBase extends BaseTestQuery {
* This will submit an "EXPLAIN" statement, and return the column value which
* contains the plan's string.
*/
- protected String getPlanInString(String sql, String columnName)
+ protected static String getPlanInString(String sql, String columnName)
throws Exception {
List<QueryResultBatch> results = testSqlWithResults(sql);
- RecordBatchLoader loader = new RecordBatchLoader(bit.getContext()
- .getAllocator());
+ RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
StringBuilder builder = new StringBuilder();
for (QueryResultBatch b : results) {
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
index bb855c9..7ff00fe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java
@@ -43,7 +43,7 @@ public class TestClassTransformation extends BaseTestQuery {
@BeforeClass
public static void beforeTestClassTransformation() throws Exception {
- sessionOptions = new SessionOptionManager(bit.getContext().getOptionManager());
+ sessionOptions = new SessionOptionManager(getDrillbitContext().getOptionManager());
}
@Test
@@ -113,7 +113,7 @@ public class TestClassTransformation extends BaseTestQuery {
private <T, X extends T> CodeGenerator<T> newCodeGenerator(Class<T> iface, Class<X> impl) {
final TemplateClassDefinition<T> template = new TemplateClassDefinition<T>(iface, impl);
- CodeGenerator<T> cg = CodeGenerator.get(template, bit.getContext().getFunctionImplementationRegistry());
+ CodeGenerator<T> cg = CodeGenerator.get(template, getDrillbitContext().getFunctionImplementationRegistry());
ClassGenerator<T> root = cg.getRoot();
root.setMappingSet(new MappingSet(new GeneratorMapping("doOutside", null, null, null)));
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
new file mode 100644
index 0000000..72715a7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.TestBuilder;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.config.UnorderedDeMuxExchange;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+/**
+ * This test starts a Drill cluster with CLUSTER_SIZE nodes and generates data for test tables.
+ *
+ * Tests queries involve HashToRandomExchange (group by and join) and test the following.
+ * 1. Plan that has mux and demux exchanges inserted
+ * 2. Run the query and check the output record count
+ * 3. Take the plan we got in (1), use SimpleParallelizer to get PlanFragments and test that the number of
+ * partition senders in a major fragment is not more than the number of Drillbit nodes in cluster and there exists
+ * at most one partition sender per Drillbit.
+ */
+public class TestLocalExchange extends PlanTestBase {
+
+ public static TemporaryFolder testTempFolder = new TemporaryFolder();
+
+ private final static int CLUSTER_SIZE = 3;
+ private final static String MUX_EXCHANGE = "\"unordered-mux-exchange\"";
+ private final static String DEMUX_EXCHANGE = "\"unordered-demux-exchange\"";
+ private final static UserSession USER_SESSION = UserSession.Builder.newBuilder()
+ .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
+ .build();
+
+ private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer(
+ 1 /*parallelizationThreshold (slice_count)*/,
+ 6 /*maxWidthPerNode*/,
+ 1000 /*maxGlobalWidth*/,
+ 1.2 /*affinityFactor*/);
+
+ private final static int NUM_DEPTS = 40;
+ private final static int NUM_EMPLOYEES = 1000;
+
+ private static String empTableLocation;
+ private static String deptTableLocation;
+
+ private static String groupByQuery;
+ private static String joinQuery;
+
+ private static String[] joinQueryBaselineColumns;
+ private static String[] groupByQueryBaselineColumns;
+
+ private static List<Object[]> groupByQueryBaselineValues;
+ private static List<Object[]> joinQueryBaselineValues;
+
+ @BeforeClass
+ public static void setupClusterSize() {
+ setDrillbitCount(CLUSTER_SIZE);
+ }
+
+ @BeforeClass
+ public static void setupTempFolder() throws IOException {
+ testTempFolder.create();
+ }
+
+ /**
+ * Generate data for two tables. Each table consists of several JSON files.
+ */
+ @BeforeClass
+ public static void generateTestDataAndQueries() throws Exception {
+ // Table 1 consists of two columns "emp_id", "emp_name" and "dept_id"
+ empTableLocation = testTempFolder.newFolder().getAbsolutePath();
+
+ // Write 100 records for each new file
+ final int empNumRecsPerFile = 100;
+ for(int fileIndex=0; fileIndex<NUM_EMPLOYEES/empNumRecsPerFile; fileIndex++) {
+ File file = new File(empTableLocation + File.separator + fileIndex + ".json");
+ PrintWriter printWriter = new PrintWriter(file);
+ for (int recordIndex = fileIndex*empNumRecsPerFile; recordIndex < (fileIndex+1)*empNumRecsPerFile; recordIndex++) {
+ String record = String.format("{ \"emp_id\" : %d, \"emp_name\" : \"Employee %d\", \"dept_id\" : %d }",
+ recordIndex, recordIndex, recordIndex % NUM_DEPTS);
+ printWriter.println(record);
+ }
+ printWriter.close();
+ }
+
+ // Table 2 consists of two columns "dept_id" and "dept_name"
+ deptTableLocation = testTempFolder.newFolder().getAbsolutePath();
+
+ // Write 4 records for each new file
+ final int deptNumRecsPerFile = 4;
+ for(int fileIndex=0; fileIndex<NUM_DEPTS/deptNumRecsPerFile; fileIndex++) {
+ File file = new File(deptTableLocation + File.separator + fileIndex + ".json");
+ PrintWriter printWriter = new PrintWriter(file);
+ for (int recordIndex = fileIndex*deptNumRecsPerFile; recordIndex < (fileIndex+1)*deptNumRecsPerFile; recordIndex++) {
+ String record = String.format("{ \"dept_id\" : %d, \"dept_name\" : \"Department %d\" }",
+ recordIndex, recordIndex);
+ printWriter.println(record);
+ }
+ printWriter.close();
+ }
+
+ // Initialize test queries
+ groupByQuery = String.format("SELECT dept_id, count(*) as numEmployees FROM dfs.`%s` GROUP BY dept_id", empTableLocation);
+ joinQuery = String.format("SELECT e.emp_name, d.dept_name FROM dfs.`%s` e JOIN dfs.`%s` d ON e.dept_id = d.dept_id",
+ empTableLocation, deptTableLocation);
+
+ // Generate and store output data for test queries. Used when verifying the output of queries ran using different
+ // configurations.
+
+ groupByQueryBaselineColumns = new String[] { "dept_id", "numEmployees" };
+
+ groupByQueryBaselineValues = Lists.newArrayList();
+ // group Id is generated based on expression 'recordIndex % NUM_DEPTS' above. 'recordIndex' runs from 0 to
+ // NUM_EMPLOYEES, so we expect each number of occurrance of each dept_id to be NUM_EMPLOYEES/NUM_DEPTS (1000/40 =
+ // 25)
+ final int numOccurrances = NUM_EMPLOYEES/NUM_DEPTS;
+ for(int i = 0; i < NUM_DEPTS; i++) {
+ groupByQueryBaselineValues.add(new Object[] { (long)i, (long)numOccurrances});
+ }
+
+ joinQueryBaselineColumns = new String[] { "emp_name", "dept_name" };
+
+ joinQueryBaselineValues = Lists.newArrayList();
+ for(int i = 0; i < NUM_EMPLOYEES; i++) {
+ final String employee = String.format("Employee %d", i);
+ final String dept = String.format("Department %d", i % NUM_DEPTS);
+ joinQueryBaselineValues.add(new String[] { employee, dept });
+ }
+ }
+
+ public static void setupHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
+ // set slice count to 1, so that we can have more parallelization for testing
+ test("ALTER SESSION SET `planner.slice_target`=1");
+ // disable the broadcast join to produce plans with HashToRandomExchanges.
+ test("ALTER SESSION SET `planner.enable_broadcast_join`=false");
+ test("ALTER SESSION SET `planner.enable_mux_exchange`=" + isMuxOn);
+ test("ALTER SESSION SET `planner.enable_demux_exchange`=" + isDeMuxOn);
+ }
+
+ @Test
+ public void testGroupBy_NoMux_NoDeMux() throws Exception {
+ testGroupByHelper(false, false);
+ }
+
+ @Test
+ public void testJoin_NoMux_NoDeMux() throws Exception {
+ testJoinHelper(false, false);
+ }
+
+ @Test
+ public void testGroupBy_Mux_NoDeMux() throws Exception {
+ testGroupByHelper(true, false);
+ }
+
+ @Test
+ public void testJoin_Mux_NoDeMux() throws Exception {
+ testJoinHelper(true, false);
+ }
+
+ @Test
+ public void testGroupBy_NoMux_DeMux() throws Exception {
+ testGroupByHelper(false, true);
+ }
+
+ @Test
+ public void testJoin_NoMux_DeMux() throws Exception {
+ testJoinHelper(false, true);
+ }
+
+ @Test
+ public void testGroupBy_Mux_DeMux() throws Exception {
+ testGroupByHelper(true, true);
+ }
+
+ @Test
+ public void testJoin_Mux_DeMux() throws Exception {
+ testJoinHelper(true, true);
+ }
+
+ private static void testGroupByHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
+ testHelper(isMuxOn, isDeMuxOn, groupByQuery,
+ isMuxOn ? 1 : 0, isDeMuxOn ? 1 : 0,
+ groupByQueryBaselineColumns, groupByQueryBaselineValues);
+ }
+
+ public static void testJoinHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
+ testHelper(isMuxOn, isDeMuxOn, joinQuery,
+ isMuxOn ? 2 : 0, isDeMuxOn ? 2 : 0,
+ joinQueryBaselineColumns, joinQueryBaselineValues);
+ }
+
+ private static void testHelper(boolean isMuxOn, boolean isDeMuxOn, String query,
+ int expectedNumMuxes, int expectedNumDeMuxes, String[] baselineColumns, List<Object[]> baselineValues)
+ throws Exception {
+ setupHelper(isMuxOn, isDeMuxOn);
+
+ String plan = getPlanInString("EXPLAIN PLAN FOR " + query, JSON_FORMAT);
+ System.out.println("Plan: " + plan);
+
+ // Make sure the plan has mux and demux exchanges (TODO: currently testing is rudimentary,
+ // need to move it to sophisticated testing once we have better planning test tools are available)
+ assertEquals("Wrong number of MuxExchanges are present in the plan",
+ expectedNumMuxes, StringUtils.countMatches(plan, MUX_EXCHANGE));
+
+ assertEquals("Wrong number of DeMuxExchanges are present in the plan",
+ expectedNumDeMuxes, StringUtils.countMatches(plan, DEMUX_EXCHANGE));
+
+ // Run the query and verify the output
+ TestBuilder testBuilder = testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns(baselineColumns);
+
+ for(Object[] baselineRecord : baselineValues) {
+ testBuilder.baselineValues(baselineRecord);
+ }
+
+ testBuilder.go();
+
+ testHelperVerifyPartitionSenderParallelization(plan, isMuxOn, isDeMuxOn);
+ }
+
+ // Verify the number of partition senders in a major fragments is not more than the cluster size and each endpoint
+ // in the cluster has at most one fragment from a given major fragment that has the partition sender.
+ private static void testHelperVerifyPartitionSenderParallelization(
+ String plan, boolean isMuxOn, boolean isDeMuxOn) throws Exception {
+
+ final DrillbitContext drillbitContext = getDrillbitContext();
+ final PhysicalPlanReader planReader = drillbitContext.getPlanReader();
+ final Fragment rootFragment = PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan);
+
+ final List<Integer> deMuxFragments = Lists.newLinkedList();
+ final List<Integer> htrFragments = Lists.newLinkedList();
+ final PlanningSet planningSet = new PlanningSet();
+
+ // Create a planningSet to get the assignment of major fragment ids to fragments.
+ PARALLELIZER.initFragmentWrappers(rootFragment, planningSet);
+
+ findFragmentsWithPartitionSender(rootFragment, planningSet, deMuxFragments, htrFragments);
+
+ QueryWorkUnit qwu = PARALLELIZER.getFragments(new OptionList(), drillbitContext.getEndpoint(),
+ QueryId.getDefaultInstance(),
+ drillbitContext.getBits(), planReader, rootFragment, USER_SESSION);
+
+ // Make sure the number of minor fragments with HashPartitioner within a major fragment is not more than the
+ // number of Drillbits in cluster
+ ArrayListMultimap<Integer, DrillbitEndpoint> partitionSenderMap = ArrayListMultimap.create();
+ for(PlanFragment planFragment : qwu.getFragments()) {
+ if (planFragment.getFragmentJson().contains("hash-partition-sender")) {
+ int majorFragmentId = planFragment.getHandle().getMajorFragmentId();
+ DrillbitEndpoint assignedEndpoint = planFragment.getAssignment();
+ partitionSenderMap.get(majorFragmentId).add(assignedEndpoint);
+ }
+ }
+
+ if (isMuxOn) {
+ verifyAssignment(htrFragments, partitionSenderMap);
+ }
+
+ if (isDeMuxOn) {
+ verifyAssignment(deMuxFragments, partitionSenderMap);
+ }
+ }
+
+ /**
+ * Helper method to find the major fragment ids of fragments that have PartitionSender.
+ * A fragment can have PartitionSender if sending exchange of the current fragment is a
+ * 1. DeMux Exchange -> goes in deMuxFragments
+ * 2. HashToRandomExchange -> goes into htrFragments
+ */
+ private static void findFragmentsWithPartitionSender(Fragment currentRootFragment, PlanningSet planningSet,
+ List<Integer> deMuxFragments, List<Integer> htrFragments) {
+
+ if (currentRootFragment != null) {
+ final Exchange sendingExchange = currentRootFragment.getSendingExchange();
+ if (sendingExchange != null) {
+ final int majorFragmentId = planningSet.get(currentRootFragment).getMajorFragmentId();
+ if (sendingExchange instanceof UnorderedDeMuxExchange) {
+ deMuxFragments.add(majorFragmentId);
+ } else if (sendingExchange instanceof HashToRandomExchange) {
+ htrFragments.add(majorFragmentId);
+ }
+ }
+
+ for(ExchangeFragmentPair e : currentRootFragment.getReceivingExchangePairs()) {
+ findFragmentsWithPartitionSender(e.getNode(), planningSet, deMuxFragments, htrFragments);
+ }
+ }
+ }
+
+ /** Helper method to verify the number of PartitionSenders in a given fragment endpoint assignments */
+ private static void verifyAssignment(List<Integer> fragmentList,
+ ArrayListMultimap<Integer, DrillbitEndpoint> partitionSenderMap) {
+
+ // We expect at least one entry the list
+ assertTrue(fragmentList.size() > 0);
+
+ for(Integer majorFragmentId : fragmentList) {
+ // we expect the fragment that has DeMux/HashToRandom as sending exchange to have parallelization with not more
+ // than the number of nodes in the cluster and each node in the cluster can have at most one assignment
+ List<DrillbitEndpoint> assignments = partitionSenderMap.get(majorFragmentId);
+ assertNotNull(assignments);
+ assertTrue(assignments.size() > 0);
+ assertTrue(String.format("Number of partition senders in major fragment [%d] is more than expected", majorFragmentId), CLUSTER_SIZE >= assignments.size());
+
+ // Make sure there are no duplicates in assigned endpoints (i.e at most one partition sender per endpoint)
+ assertTrue("Some endpoints have more than one fragment that has ParitionSender", ImmutableSet.copyOf(assignments).size() == assignments.size());
+ }
+ }
+
+ @AfterClass
+ public static void cleanupTempFolder() throws IOException {
+ testTempFolder.delete();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
index 9a32ff9..3116fbb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
@@ -54,11 +54,16 @@ public abstract class PopUnitTestBase extends ExecTest{
return i;
}
- public Fragment getRootFragment(PhysicalPlanReader reader, String file) throws FragmentSetupException, IOException, ForemanSetupException {
- MakeFragmentsVisitor f = new MakeFragmentsVisitor();
+ public static Fragment getRootFragment(PhysicalPlanReader reader, String file) throws FragmentSetupException,
+ IOException, ForemanSetupException {
+ return getRootFragmentFromPlanString(reader, Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
+ }
+
- PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
+ public static Fragment getRootFragmentFromPlanString(PhysicalPlanReader reader, String planString)
+ throws FragmentSetupException, IOException, ForemanSetupException {
+ PhysicalPlan plan = reader.readPhysicalPlan(planString);
PhysicalOperator o = plan.getSortedOperators(false).iterator().next();
- return o.accept(f, null);
+ return o.accept(MakeFragmentsVisitor.INSTANCE, null);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 6349b76..07d310a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -23,7 +23,6 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
-import org.apache.drill.exec.planner.fragment.StatsCollector;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared;
@@ -35,6 +34,8 @@ import org.junit.Test;
import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+
public class TestFragmentChecker extends PopUnitTestBase{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestFragmentChecker.class);
@@ -44,12 +45,10 @@ public class TestFragmentChecker extends PopUnitTestBase{
}
- private void print(String fragmentFile, int bitCount, int exepectedFragmentCount) throws Exception{
-
+ private void print(String fragmentFile, int bitCount, int expectedFragmentCount) throws Exception{
System.out.println(String.format("=================Building plan fragments for [%s]. Allowing %d total Drillbits.==================", fragmentFile, bitCount));
PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
Fragment fragmentRoot = getRootFragment(ppr, fragmentFile);
- PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot);
SimpleParallelizer par = new SimpleParallelizer(1000*1000, 5, 10, 1.2);
List<DrillbitEndpoint> endpoints = Lists.newArrayList();
DrillbitEndpoint localBit = null;
@@ -61,7 +60,7 @@ public class TestFragmentChecker extends PopUnitTestBase{
endpoints.add(b1);
}
- QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet,
+ QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot,
UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build());
System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
@@ -71,9 +70,9 @@ public class TestFragmentChecker extends PopUnitTestBase{
System.out.println(String.format("=========Fragment [%d:%d]=====", f.getHandle().getMajorFragmentId(), f.getHandle().getMinorFragmentId()));
System.out.print(f.getFragmentJson());
}
- //assertEquals(exepectedFragmentCount, qwu.getFragments().size());
- logger.debug("Planning Set {}", planningSet);
+ assertEquals(expectedFragmentCount,
+ qwu.getFragments().size() + 1 /* root fragment is not part of the getFragments() list*/);
}
@Test
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 219e66f..9999be0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -147,8 +147,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
List<QueryResultBatch> result = testSqlWithResults("select sum(a) as total_sum from dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
- RecordBatchLoader loader = new RecordBatchLoader(bit.getContext()
- .getAllocator());
+ RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
QueryResultBatch b = result.get(0);
loader.load(b.getHeader().getDef(), b.getData());
@@ -166,8 +165,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
public void testNullableFilter() throws Exception {
List<QueryResultBatch> result = testSqlWithResults("select count(wr_return_quantity) as row_count from dfs.`/tmp/web_returns` where wr_return_quantity = 1");
assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
- RecordBatchLoader loader = new RecordBatchLoader(bit.getContext()
- .getAllocator());
+ RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
QueryResultBatch b = result.get(0);
loader.load(b.getHeader().getDef(), b.getData());
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java
new file mode 100644
index 0000000..8539e59
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestArrayWrappedIntIntMap {
+ @Test
+ public void testSimple() {
+ ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+ map.put(0, 0);
+ map.put(1, 1);
+ map.put(9, 9);
+
+ assertEquals(0, map.get(0));
+ assertEquals(1, map.get(1));
+ assertEquals(9, map.get(9));
+ }
+
+ @Test(expected = ArrayIndexOutOfBoundsException.class)
+ public void testInvalidKeyAccess() {
+ ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+ map.put(0, 0);
+ map.put(1, 1);
+ map.put(9, 9);
+
+ assertEquals(0, map.get(0));
+ assertEquals(1, map.get(1));
+ assertEquals(9, map.get(9));
+
+ assertEquals(Integer.MIN_VALUE, map.get(2));
+ map.get(256); // this should throw ArrayOutOfBoundsException
+ }
+
+ @Test
+ public void testResizing() {
+ ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+ int[] expectedValues = new int[] {1, 32, 64, 150, 256, 4000};
+
+ for(int i=0; i<expectedValues.length; i++) {
+ map.put(expectedValues[i], expectedValues[i]);
+ }
+
+ for(int i=0; i<expectedValues.length; i++) {
+ assertEquals(expectedValues[i], map.get(expectedValues[i]));
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidKeyBelowMinValueSupported() {
+ ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+ map.put(-1, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidKeyAboveMaxKeyValueSupported() {
+ ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+ map.put(1 << 16, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidValuePut() {
+ ArrayWrappedIntIntMap map = new ArrayWrappedIntIntMap();
+ map.put(1, -1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/59aae348/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1f08574..0e56ed6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -389,6 +389,9 @@
<additionalClasspathElements>
<additionalClasspathElement>./exec/jdbc/src/test/resources/storage-plugins.json</additionalClasspathElement>
</additionalClasspathElements>
+ <systemPropertyVariables>
+ <java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
+ </systemPropertyVariables>
</configuration>
</plugin>
<plugin>