You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by su...@apache.org on 2017/02/25 07:17:58 UTC
[05/29] drill git commit: DRILL-5260: Extend "Cluster Fixture" test
framework
DRILL-5260: Extend "Cluster Fixture" test framework
- Config option to suppress printing of CSV and other output. (Allows
printing for single tests, not printing when running from Maven.)
- Parsing of query profiles to extract plan and run time information.
- Fix bug in log fixture when enabling logging for a package.
- Improved ZK support.
- Set up the new CTTAS default temporary workspace for tests.
- Clean up persistent storage files on disk to avoid CTTAS startup
failures.
- Provides a set of examples for how to use the cluster fixture.
closes #753
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6be287df
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6be287df
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6be287df
Branch: refs/heads/master
Commit: 6be287dfb7ebce6a4233b37675756b4c57346ee9
Parents: 24b4e7f
Author: Paul Rogers <pr...@maprtech.com>
Authored: Fri Feb 17 17:39:20 2017 -0800
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Fri Feb 24 18:41:50 2017 -0800
----------------------------------------------------------------------
common/pom.xml | 7 +-
.../java/org/apache/drill/test/DrillTest.java | 37 +-
.../drill/exec/server/RemoteServiceSet.java | 8 +-
.../ZookeeperPersistentStoreProvider.java | 6 +-
.../apache/drill/exec/util/TestUtilities.java | 2 +
.../java/org/apache/drill/BaseTestQuery.java | 9 +-
.../java/org/apache/drill/DrillTestWrapper.java | 2 +-
.../java/org/apache/drill/QueryTestUtil.java | 3 +
.../test/java/org/apache/drill/TestBuilder.java | 2 +-
.../org/apache/drill/exec/ZookeeperHelper.java | 36 +-
.../org/apache/drill/test/ClientFixture.java | 45 +-
.../org/apache/drill/test/ClusterFixture.java | 398 ++++++++++--
.../java/org/apache/drill/test/ExampleTest.java | 243 ++++++++
.../org/apache/drill/test/FixtureBuilder.java | 94 ++-
.../java/org/apache/drill/test/LogFixture.java | 4 +-
.../org/apache/drill/test/ProfileParser.java | 616 +++++++++++++++++--
.../org/apache/drill/test/QueryBuilder.java | 12 +-
exec/jdbc-all/pom.xml | 11 +-
pom.xml | 6 +
19 files changed, 1372 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 8e14531..c044a33 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -17,7 +17,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
-
+
<parent>
<artifactId>drill-root</artifactId>
<groupId>org.apache.drill</groupId>
@@ -81,7 +81,7 @@
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
-
+
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
@@ -92,7 +92,7 @@
<artifactId>metrics-servlets</artifactId>
<version>3.0.1</version>
</dependency>
-
+
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
@@ -107,7 +107,6 @@
</dependencies>
-
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/common/src/test/java/org/apache/drill/test/DrillTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java b/common/src/test/java/org/apache/drill/test/DrillTest.java
index 18c2c1a..bb051d7 100644
--- a/common/src/test/java/org/apache/drill/test/DrillTest.java
+++ b/common/src/test/java/org/apache/drill/test/DrillTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,11 +17,13 @@
*/
package org.apache.drill.test;
+import java.io.PrintStream;
import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.List;
+import org.apache.commons.io.output.NullOutputStream;
import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.common.util.TestTools;
import org.junit.AfterClass;
@@ -69,6 +71,25 @@ public class DrillTest {
@Rule public TestName TEST_NAME = new TestName();
+ /**
+ * Option to cause tests to produce verbose output. Many tests provide
+ * detailed information to stdout when enabled. To enable:
+ * <p>
+ * <tt>java ... -Dtest.verbose=true ...</tt>
+ */
+ public static final String VERBOSE_OUTPUT = "test.verbose";
+
+ protected static final boolean verbose = Boolean.parseBoolean(System.getProperty(VERBOSE_OUTPUT));
+
+ /**
+ * Output destination for verbose test output. Rather than using
+ * <tt>System.out</tt>, use <tt>DrillTest.out</tt>. Output will
+ * automagically be routed to the bit bucket unless the
+ * {@link #VERBOSE_OUTPUT} flag is set.
+ */
+
+ public static final PrintStream out = verbose ? System.out : new PrintStream(new NullOutputStream());
+
@Before
public void printID() throws Exception {
System.out.printf("Running %s#%s\n", getClass().getName(), TEST_NAME.getMethodName());
@@ -113,7 +134,6 @@ public class DrillTest {
DrillStringUtils.readable(endNonHeap - startNonHeap), DrillStringUtils.readable(endNonHeap) //
);
}
-
}
private static class TestLogReporter extends TestWatcher {
@@ -148,7 +168,6 @@ public class DrillTest {
Thread.sleep(250);
}
}
-
}
public static String escapeJsonString(String original) {
@@ -188,7 +207,17 @@ public class DrillTest {
public long getMemNonHeap() {
return memoryBean.getNonHeapMemoryUsage().getUsed();
}
-
}
+ /**
+ * Reports whether verbose output has been selected for this test run.
+ *
+ * @return <tt>true</tt> if verbose output is wanted (test is likely running
+ * in a debugger), <tt>false</tt> if verbose output is to be suppressed
+ * (test is likely running in a batch Maven build).
+ */
+
+ public static boolean verbose( ) {
+ return verbose;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index 06bb686..91c2b20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -17,10 +17,8 @@
*/
package org.apache.drill.exec.server;
-import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.local.LocalClusterCoordinator;
-import org.apache.drill.exec.memory.BufferAllocator;
public class RemoteServiceSet implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
@@ -42,12 +40,8 @@ public class RemoteServiceSet implements AutoCloseable {
coordinator.close();
}
+ @SuppressWarnings("resource")
public static RemoteServiceSet getLocalServiceSet() {
return new RemoteServiceSet(new LocalClusterCoordinator());
}
-
- public static RemoteServiceSet getServiceSetWithFullCache(DrillConfig config, BufferAllocator allocator) throws Exception{
- return new RemoteServiceSet(new LocalClusterCoordinator());
- }
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
index 58c46a7..a5502cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
@@ -31,13 +31,11 @@ import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class ZookeeperPersistentStoreProvider extends BasePersistentStoreProvider {
- private static final Logger logger = LoggerFactory.getLogger(ZookeeperPersistentStoreProvider.class);
+// private static final Logger logger = LoggerFactory.getLogger(ZookeeperPersistentStoreProvider.class);
- private static final String DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT = "drill.exec.sys.store.provider.zk.blobroot";
+ public static final String DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT = "drill.exec.sys.store.provider.zk.blobroot";
private final CuratorFramework curator;
private final DrillFileSystem fs;
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
index cb687af..7215d10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
@@ -64,6 +64,7 @@ public class TestUtilities {
public static void updateDfsTestTmpSchemaLocation(final StoragePluginRegistry pluginRegistry,
final String tmpDirPath)
throws ExecutionSetupException {
+ @SuppressWarnings("resource")
final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(dfsTestPluginName);
final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
final WorkspaceConfig tmpWSConfig = pluginConfig.workspaces.get(dfsTestTmpSchema);
@@ -81,6 +82,7 @@ public class TestUtilities {
* Schema "dfs.tmp" added as part of the default bootstrap plugins file that comes with drill-java-exec jar
*/
public static void makeDfsTmpSchemaImmutable(final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
+ @SuppressWarnings("resource")
final FileSystemPlugin dfsPlugin = (FileSystemPlugin) pluginRegistry.getPlugin(dfsPluginName);
final FileSystemConfig dfsPluginConfig = (FileSystemConfig) dfsPlugin.getConfig();
final WorkspaceConfig tmpWSConfig = dfsPluginConfig.workspaces.get(dfsTmpSchema);
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 42cfe08..2d5f0c7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -75,9 +75,8 @@ import org.apache.drill.exec.vector.ValueVector;
public class BaseTestQuery extends ExecTest {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
- protected static final String TEMP_SCHEMA = "dfs_test.tmp";
+ public static final String TEMP_SCHEMA = "dfs_test.tmp";
- private static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
private static final int MAX_WIDTH_PER_NODE = 2;
@SuppressWarnings("serial")
@@ -182,11 +181,7 @@ public class BaseTestQuery extends ExecTest {
private static void openClient() throws Exception {
allocator = RootAllocatorFactory.newRoot(config);
- if (config.hasPath(ENABLE_FULL_CACHE) && config.getBoolean(ENABLE_FULL_CACHE)) {
- serviceSet = RemoteServiceSet.getServiceSetWithFullCache(config, allocator);
- } else {
- serviceSet = RemoteServiceSet.getLocalServiceSet();
- }
+ serviceSet = RemoteServiceSet.getLocalServiceSet();
dfsTestTmpSchemaLocation = TestUtilities.createTempDir();
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 054676d..f217632 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
index 26bb4d0..54ae774 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
@@ -182,6 +182,7 @@ public class QueryTestUtil {
* @param srOption the scalar replacement option value to use
* @return the original scalar replacement option setting (so it can be restored)
*/
+ @SuppressWarnings("resource")
public static OptionValue setupScalarReplacementOption(
final Drillbit drillbit, final ClassTransformer.ScalarReplacementOption srOption) {
// set the system option
@@ -208,7 +209,9 @@ public class QueryTestUtil {
* @param srOption the scalar replacement option value to use
*/
public static void restoreScalarReplacementOption(final Drillbit drillbit, final OptionValue srOption) {
+ @SuppressWarnings("resource")
final DrillbitContext drillbitContext = drillbit.getContext();
+ @SuppressWarnings("resource")
final OptionManager optionManager = drillbitContext.getOptionManager();
optionManager.setOption(srOption);
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index bef7b3b..619959b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
index 630c81b..42247ea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
@@ -31,13 +31,21 @@ import org.apache.drill.exec.util.MiniZooKeeperCluster;
*
* <p>Tests that need a Zookeeper instance can initialize a static instance of this class in
* their {@link org.junit.BeforeClass} section to set up Zookeeper.
+ * <p>
+ * Modified to also work in the {@link ClusterFixture} class. The "bare" use sets up a
+ * Drill config. The use in the cluster fixture delegates to the cluster fixture the task
+ * of setting up the Drill config. In the "bare" case, the port number comes from the
+ * Drill config. In the cluster fixture case, we let ZK choose the port and we learn
+ * what it is. As a result, the code is a bit more cluttered than if we could just use
+ * the class for one purpose.
*/
+
public class ZookeeperHelper {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZookeeperHelper.class);
private final File testDir = new File("target/test-data");
private final DrillConfig config;
- private final String zkUrl;
+ private String zkUrl;
private MiniZooKeeperCluster zkCluster;
/**
@@ -65,12 +73,21 @@ public class ZookeeperHelper {
config = DrillConfig.create(overrideProps);
zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
- if (!testDir.exists()) {
- testDir.mkdirs();
- }
+ testDir.mkdirs();
}
/**
+ * Constructor for the cluster fixture case. Don't create a Drill config.
+ * Let ZK choose the port.
+ */
+
+ public ZookeeperHelper(String dummy) {
+ zkUrl = null;
+ config = null;
+ testDir.mkdirs();
+ }
+
+ /**
* Start the Zookeeper instance.
*
* <p>This must be used before any operations that depend on the Zookeeper instance being up.
@@ -84,8 +101,13 @@ public class ZookeeperHelper {
try {
zkCluster = new MiniZooKeeperCluster();
- zkCluster.setDefaultClientPort(Integer.parseInt(zkUrl.split(":")[1]));
+ if (zkUrl != null) {
+ zkCluster.setDefaultClientPort(Integer.parseInt(zkUrl.split(":")[1]));
+ }
zkCluster.startup(testDir, numServers);
+ if (zkUrl == null) {
+ zkUrl = "localhost:" + zkCluster.getClientPort();
+ }
} catch (IOException | InterruptedException e) {
propagate(e);
}
@@ -116,4 +138,8 @@ public class ZookeeperHelper {
public DrillConfig getConfig() {
return config;
}
+
+ public String getConnectionString( ) {
+ return zkUrl;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
index be36dd7..b83d5fc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -29,6 +29,8 @@ import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.testing.Controls;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
import org.apache.drill.test.ClusterFixture.FixtureTestServices;
import org.apache.drill.test.QueryBuilder.QuerySummary;
@@ -56,7 +58,7 @@ public class ClientFixture implements AutoCloseable {
return this;
}
- ClientFixture build( ) {
+ public ClientFixture build( ) {
try {
return new ClientFixture(this);
} catch (RpcException e) {
@@ -77,7 +79,11 @@ public class ClientFixture implements AutoCloseable {
// Create a client.
- client = new DrillClient(cluster.config( ), cluster.serviceSet( ).getCoordinator());
+ if (cluster.usesZK()) {
+ client = new DrillClient(cluster.config( ));
+ } else {
+ client = new DrillClient(cluster.config( ), cluster.serviceSet( ).getCoordinator());
+ }
client.connect(builder.clientProps);
cluster.clients.add(this);
}
@@ -94,12 +100,12 @@ public class ClientFixture implements AutoCloseable {
* @throws RpcException
*/
- public void alterSession(String key, Object value ) throws Exception {
+ public void alterSession(String key, Object value ) {
String sql = "ALTER SESSION SET `" + key + "` = " + ClusterFixture.stringify( value );
runSqlSilently( sql );
}
- public void alterSystem(String key, Object value ) throws Exception {
+ public void alterSystem(String key, Object value ) {
String sql = "ALTER SYSTEM SET `" + key + "` = " + ClusterFixture.stringify( value );
runSqlSilently( sql );
}
@@ -111,8 +117,14 @@ public class ClientFixture implements AutoCloseable {
* @throws RpcException
*/
- public void runSqlSilently(String sql) throws Exception {
- queryBuilder().sql(sql).run();
+ public void runSqlSilently(String sql) {
+ try {
+ queryBuilder().sql(sql).run();
+ } catch (Exception e) {
+ // Should not fail during tests. Convert exception to unchecked
+ // to simplify test code.
+ new IllegalStateException(e);
+ }
}
public QueryBuilder queryBuilder() {
@@ -185,11 +197,22 @@ public class ClientFixture implements AutoCloseable {
*/
public ProfileParser parseProfile(String queryId) throws IOException {
- String tmpDir = cluster().config().getString(ExecConstants.DRILL_TMP_DIR);
- File drillTmp = new File(new File(tmpDir), "drill");
- File profileDir = new File(drillTmp, "profiles" );
- File file = new File( profileDir, queryId + ".sys.drill" );
+ File file = new File(cluster.getProfileDir(), queryId + ".sys.drill" );
return new ProfileParser(file);
}
-}
\ No newline at end of file
+ /**
+ * Set a set of injection controls that apply <b>on the next query
+ * only</b>. That query should be your target query, but may
+ * accidentally be an ALTER SESSION, EXPLAIN, etc. So, call this just
+ * before the SELECT statement.
+ *
+ * @param controls the controls string created by
+ * {@link Controls#newBuilder()} builder.
+ */
+
+ public void setControls(String controls) {
+ ControlsInjectionUtil.validateControlsString(controls);
+ alterSession(ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index f89eb01..b920edb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.apache.drill.BaseTestQuery;
import org.apache.drill.DrillTestWrapper.TestServices;
import org.apache.drill.QueryTestUtil;
import org.apache.drill.common.config.DrillConfig;
@@ -49,6 +50,7 @@ import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.WorkspaceConfig;
import org.apache.drill.exec.store.mock.MockStorageEngine;
import org.apache.drill.exec.store.mock.MockStorageEngineConfig;
+import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
import org.apache.drill.exec.util.TestUtilities;
import com.google.common.base.Charsets;
@@ -64,7 +66,8 @@ import com.google.common.io.Resources;
*/
public class ClusterFixture implements AutoCloseable {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClientFixture.class);
+ // private static final org.slf4j.Logger logger =
+ // org.slf4j.LoggerFactory.getLogger(ClientFixture.class);
public static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
public static final int MAX_WIDTH_PER_NODE = 2;
@@ -75,6 +78,14 @@ public class ClusterFixture implements AutoCloseable {
// configuration. They allow tests to run successfully in Eclipse.
put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false);
+
+ // The CTTAS function requires that the default temporary workspace be
+ // writable. By default, the default temporary workspace points to
+ // dfs.tmp. But, the test setup marks dfs.tmp as read-only. To work
+ // around this, tests are supposed to use dfs_test. So, we need to
+ // set the default temporary workspace to dfs_test.tmp.
+
+ put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, BaseTestQuery.TEMP_SCHEMA);
put(ExecConstants.HTTP_ENABLE, false);
put(QueryTestUtil.TEST_QUERY_PRINTING_SILENT, true);
put("drill.catastrophic_to_standard_out", true);
@@ -99,33 +110,91 @@ public class ClusterFixture implements AutoCloseable {
// behavior. Production default is DEFAULT_SCAN_THREADS
put(ExecConstants.SCAN_THREADPOOL_SIZE, 4);
+
+ // Define a useful root location for the ZK persistent
+ // storage. Profiles will go here when running in distributed
+ // mode.
+
+ put(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT, "/tmp/drill/log");
}
};
public static final String DEFAULT_BIT_NAME = "drillbit";
private DrillConfig config;
- private Map<String,Drillbit> bits = new HashMap<>();
+ private Map<String, Drillbit> bits = new HashMap<>();
private Drillbit defaultDrillbit;
private BufferAllocator allocator;
private boolean ownsZK;
private ZookeeperHelper zkHelper;
private RemoteServiceSet serviceSet;
- private String dfsTestTmpSchemaLocation;
+ private File dfsTestTempDir;
protected List<ClientFixture> clients = new ArrayList<>();
+ private boolean usesZk;
+ private boolean preserveLocalFiles;
+ private boolean isLocal;
+
+ /**
+ * Temporary directories created for this test cluster.
+ * Each is removed when closing the cluster.
+ */
+
+ private List<File> tempDirs = new ArrayList<>();
- protected ClusterFixture(FixtureBuilder builder) throws Exception {
+ ClusterFixture(FixtureBuilder builder) {
+
+ configureZk(builder);
+ try {
+ createConfig(builder);
+ allocator = RootAllocatorFactory.newRoot(config);
+ startDrillbits(builder);
+ applyOptions(builder);
+ } catch (Exception e) {
+ // Translate exceptions to unchecked to avoid cluttering
+ // tests. Failures will simply fail the test itself.
+
+ throw new IllegalStateException( "Cluster fixture setup failed", e );
+ }
+ }
+
+ private void configureZk(FixtureBuilder builder) {
// Start ZK if requested.
+ String zkConnect = null;
if (builder.zkHelper != null) {
+ // Case where the test itself started ZK and we're only using it.
+
zkHelper = builder.zkHelper;
ownsZK = false;
- } else if (builder.zkCount > 0) {
- zkHelper = new ZookeeperHelper(true);
- zkHelper.startZookeeper(builder.zkCount);
+ } else if (builder.localZkCount > 0) {
+ // Case where we need a local ZK just for this test cluster.
+
+ zkHelper = new ZookeeperHelper("dummy");
+ zkHelper.startZookeeper(builder.localZkCount);
ownsZK = true;
}
+ if (zkHelper != null) {
+ zkConnect = zkHelper.getConnectionString();
+
+ // When using ZK, we need to pass in the connection property as
+ // a config property. But, we can only do that if we are passing
+ // in config properties defined at run time. Drill does not allow
+ // combining locally-set properties and a config file: it is one
+ // or the other.
+
+ if (builder.configProps == null) {
+ throw new IllegalArgumentException("Cannot specify a local ZK while using an external config file.");
+ }
+ builder.configProperty(ExecConstants.ZK_CONNECTION, zkConnect);
+
+ // Forced to disable this, because currently we leak memory which is a known issue for query cancellations.
+ // Setting this causes unit tests to fail.
+ builder.configProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, true);
+ }
+ }
+
+ private void createConfig(FixtureBuilder builder) throws Exception {
// Create a config
// Because of the way DrillConfig works, we can set the ZK
@@ -136,45 +205,60 @@ public class ClusterFixture implements AutoCloseable {
} else if (builder.configProps != null) {
config = DrillConfig.create(configProperties(builder.configProps));
} else {
- config = DrillConfig.create(configProperties(TEST_CONFIGURATIONS));
+ throw new IllegalStateException("Configuration was not provided.");
}
- // Not quite sure what this is, but some tests seem to use it.
+ if (builder.usingZk) {
+ // Distribute drillbit using ZK (in-process or external)
- if (builder.enableFullCache ||
- (config.hasPath(ENABLE_FULL_CACHE) && config.getBoolean(ENABLE_FULL_CACHE))) {
- serviceSet = RemoteServiceSet.getServiceSetWithFullCache(config, allocator);
+ serviceSet = null;
+ usesZk = true;
+ isLocal = false;
} else {
+ // Embedded Drillbit.
+
serviceSet = RemoteServiceSet.getLocalServiceSet();
+ isLocal = true;
}
+ }
- dfsTestTmpSchemaLocation = TestUtilities.createTempDir();
+ private void startDrillbits(FixtureBuilder builder) throws Exception {
+// // Ensure that Drill uses the log directory determined here rather than
+// // it's hard-coded defaults. WIP: seems to be needed some times but
+// // not others.
+//
+// String logDir = null;
+// if (builder.tempDir != null) {
+// logDir = builder.tempDir.getAbsolutePath();
+// }
+// if (logDir == null) {
+// logDir = config.getString(ExecConstants.DRILL_TMP_DIR);
+// if (logDir != null) {
+// logDir += "/drill/log";
+// }
+// }
+// if (logDir == null) {
+// logDir = "/tmp/drill";
+// }
+// new File(logDir).mkdirs();
+// System.setProperty("drill.log-dir", logDir);
+
+ dfsTestTempDir = makeTempDir("dfs-test");
+
+ // Clean up any files that may have been left from the
+ // last run.
+
+ preserveLocalFiles = builder.preserveLocalFiles;
+ removeLocalFiles();
+
+ // Start the Drillbits.
Preconditions.checkArgument(builder.bitCount > 0);
int bitCount = builder.bitCount;
for (int i = 0; i < bitCount; i++) {
- @SuppressWarnings("resource")
Drillbit bit = new Drillbit(config, serviceSet);
bit.run();
- // Create the dfs_test name space
-
- @SuppressWarnings("resource")
- final StoragePluginRegistry pluginRegistry = bit.getContext().getStorage();
- TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, dfsTestTmpSchemaLocation);
- TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
-
- // Create the mock data plugin
- // (Disabled until DRILL-5152 is committed.)
-
- MockStorageEngineConfig config = MockStorageEngineConfig.INSTANCE;
- @SuppressWarnings("resource")
- MockStorageEngine plugin = new MockStorageEngine(
- MockStorageEngineConfig.INSTANCE, bit.getContext(),
- MockStorageEngineConfig.NAME);
- ((StoragePluginRegistryImpl) pluginRegistry)
- .definePlugin(MockStorageEngineConfig.NAME, config, plugin);
-
// Bit name and registration.
String name;
@@ -189,7 +273,7 @@ public class ClusterFixture implements AutoCloseable {
if (bitCount == 1) {
name = DEFAULT_BIT_NAME;
} else {
- name = DEFAULT_BIT_NAME + Integer.toString(i+1);
+ name = DEFAULT_BIT_NAME + Integer.toString(i + 1);
}
}
bits.put(name, bit);
@@ -200,11 +284,29 @@ public class ClusterFixture implements AutoCloseable {
if (i == 0) {
defaultDrillbit = bit;
}
+ configureStoragePlugins(bit);
}
+ }
+
+ private void configureStoragePlugins(Drillbit bit) throws Exception {
+ // Create the dfs_test name space
+
+ @SuppressWarnings("resource")
+ final StoragePluginRegistry pluginRegistry = bit.getContext().getStorage();
+ TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, dfsTestTempDir.getAbsolutePath());
+ TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
- // Some operations need an allocator.
+ // Create the mock data plugin
- allocator = RootAllocatorFactory.newRoot(config);
+ MockStorageEngineConfig config = MockStorageEngineConfig.INSTANCE;
+ @SuppressWarnings("resource")
+ MockStorageEngine plugin = new MockStorageEngine(
+ MockStorageEngineConfig.INSTANCE, bit.getContext(),
+ MockStorageEngineConfig.NAME);
+ ((StoragePluginRegistryImpl) pluginRegistry).definePlugin(MockStorageEngineConfig.NAME, config, plugin);
+ }
+
+ private void applyOptions(FixtureBuilder builder) throws Exception {
// Apply system options
@@ -228,9 +330,6 @@ public class ClusterFixture implements AutoCloseable {
for (Entry<Object, Object> entry : configProps.entrySet()) {
effectiveProps.put(entry.getKey(), entry.getValue().toString());
}
- if (zkHelper != null) {
- effectiveProps.put(ExecConstants.ZK_CONNECTION, zkHelper.getConfig().getString(ExecConstants.ZK_CONNECTION));
- }
return effectiveProps;
}
@@ -240,6 +339,7 @@ public class ClusterFixture implements AutoCloseable {
public RemoteServiceSet serviceSet() { return serviceSet; }
public BufferAllocator allocator() { return allocator; }
public DrillConfig config() { return config; }
+ public File getDfsTestTmpDir() { return dfsTestTempDir; }
public ClientFixture.ClientBuilder clientBuilder() {
return new ClientFixture.ClientBuilder(this);
@@ -257,7 +357,7 @@ public class ClusterFixture implements AutoCloseable {
}
/**
- * Close the clients, drillbits, allocator and
+ * Close the clients, Drillbits, allocator and
* Zookeeper. Checks for exceptions. If an exception occurs,
* continues closing, suppresses subsequent exceptions, and
* throws the first exception at completion of close. This allows
@@ -292,11 +392,86 @@ public class ClusterFixture implements AutoCloseable {
}
}
zkHelper = null;
+
+ // Delete any local files, if we wrote to the local
+ // persistent store. But, leave the files if the user wants
+ // to review them, for debugging, say. Note that, even if the
+ // files are preserved here, they will be removed when the
+ // next cluster fixture starts, else the CTTAS initialization
+ // will fail.
+
+ if (! preserveLocalFiles) {
+ try {
+ removeLocalFiles();
+ } catch (Exception e) {
+ ex = ex == null ? e : ex;
+ }
+ }
+
+ // Remove temporary directories created for this cluster session.
+
+ try {
+ removeTempDirs();
+ } catch (Exception e) {
+ ex = ex == null ? e : ex;
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ /**
+ * Removes files stored locally in the "local store provider."
+ * Required because CTTAS setup fails if these files are left from one
+ * run to the next.
+ *
+ * @throws IOException if a directory cannot be deleted
+ */
+
+ private void removeLocalFiles() throws IOException {
+
+ // Don't delete if this is not a local Drillbit.
+
+ if (! isLocal) {
+ return;
+ }
+
+ // Remove the local files if they exist.
+
+ String localStoreLocation = config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH);
+ removeDir(new File(localStoreLocation));
+ }
+
+ private void removeTempDirs() throws IOException {
+ IOException ex = null;
+ for (File dir : tempDirs) {
+ try {
+ removeDir(dir);
+ } catch (IOException e) {
+ ex = ex == null ? e : ex;
+ }
+ }
if (ex != null) {
throw ex;
}
}
+ public void removeDir(File dir) throws IOException {
+ if (dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ }
+
+ /**
+ * Close a resource, suppressing the exception, and keeping
+ * only the first exception that may occur. We assume that only
+ * the first is useful, any others are probably down-stream effects
+ * of that first one.
+ *
+ * @param item Item to be closed
+ * @param ex exception to be returned if none thrown here
+ * @return the first exception found
+ */
private Exception safeClose(AutoCloseable item, Exception ex) {
try {
if (item != null) {
@@ -308,10 +483,27 @@ public class ClusterFixture implements AutoCloseable {
return ex;
}
+ /**
+ * Define a workspace within an existing storage plugin. Useful for
+ * pointing to local file system files outside the Drill source tree.
+ *
+ * @param pluginName name of the plugin like "dfs" or "dfs_test".
+ * @param schemaName name of the new schema
+ * @param path directory location (usually local)
+ * @param defaultFormat default format for files in the schema
+ */
+
public void defineWorkspace(String pluginName, String schemaName, String path,
- String defaultFormat) throws ExecutionSetupException {
+ String defaultFormat) {
for (Drillbit bit : drillbits()) {
- defineWorkspace(bit, pluginName, schemaName, path, defaultFormat);
+ try {
+ defineWorkspace(bit, pluginName, schemaName, path, defaultFormat);
+ } catch (ExecutionSetupException e) {
+ // This functionality is supposed to work in tests. Change
+ // exception to unchecked to make test code simpler.
+
+ throw new IllegalStateException(e);
+ }
}
}
@@ -341,10 +533,26 @@ public class ClusterFixture implements AutoCloseable {
;
}
+ /**
+ * Return a cluster builder without any of the usual defaults. Use
+ * this only for special cases. Your code is responsible for all the
+ * odd bits that must be set to get the setup right. See
+ * {@link ClusterFixture#TEST_CONFIGURATIONS} for details. Note that
+ * you are often better off using the defaults, then replacing selected
+ * properties with the values you prefer.
+ *
+ * @return a fixture builder with no default properties set
+ */
+
public static FixtureBuilder bareBuilder() {
return new FixtureBuilder();
}
+ /**
+ * Shim class to allow the {@link TestBuilder} class to work with the
+ * cluster fixture.
+ */
+
public static class FixtureTestServices implements TestServices {
private ClientFixture client;
@@ -370,11 +578,27 @@ public class ClusterFixture implements AutoCloseable {
}
}
- public static ClusterFixture standardCluster() throws Exception {
+ /**
+ * Return a cluster fixture built with standard options. This is a short-cut
+ * for simple tests that don't need special setup.
+ *
+ * @return a cluster fixture with standard options
+ * @throws Exception if something goes wrong
+ */
+ public static ClusterFixture standardCluster() {
return builder().build();
}
- static String stringify(Object value) {
+ /**
+ * Convert a Java object (typically a boxed scalar) to a string
+ * for use in SQL. Quotes strings but just converts others to
+ * string format.
+ *
+ * @param value the value to encode
+ * @return the SQL-acceptable string equivalent
+ */
+
+ public static String stringify(Object value) {
if (value instanceof String) {
return "'" + (String) value + "'";
} else {
@@ -387,11 +611,22 @@ public class ClusterFixture implements AutoCloseable {
final URL url = Resources.getResource(trimSlash(resource));
if (url == null) {
- throw new IOException(String.format("Unable to find resource %s.", resource));
+ throw new IOException(
+ String.format("Unable to find resource %s.", resource));
}
return Resources.toString(url, Charsets.UTF_8);
}
+ /**
+ * Load a resource file, returning the resource as a string.
+ * "Hides" the checked exception as unchecked, which is fine
+ * in a test as the unchecked exception will fail the test
+ * without unnecessary error fiddling.
+ *
+ * @param resource path to the resource
+ * @return the resource contents as a string
+ */
+
public static String loadResource(String resource) {
try {
return getResource(resource);
@@ -400,7 +635,15 @@ public class ClusterFixture implements AutoCloseable {
}
}
- static String trimSlash(String path) {
+ /**
+ * Guava likes paths to resources without an initial slash, the JDK
+ * needs a slash. Normalize the path when needed.
+ *
+ * @param path resource path with optional leading slash
+ * @return same path without the leading slash
+ */
+
+ public static String trimSlash(String path) {
if (path == null) {
return path;
} else if (path.startsWith("/")) {
@@ -411,12 +654,13 @@ public class ClusterFixture implements AutoCloseable {
}
/**
- * Create a temp directory to store the given <i>dirName</i>.
- * Directory will be deleted on exit. Directory is created if it does
- * not exist.
+ * Create a temp directory to store the given <i>dirName</i>. Directory will
+ * be deleted on exit. Directory is created if it does not exist.
+ *
* @param dirName directory name
* @return Full path including temp parent directory and given directory name.
*/
+
public static File getTempDir(final String dirName) {
final File dir = Files.createTempDir();
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -429,4 +673,62 @@ public class ClusterFixture implements AutoCloseable {
tempDir.mkdirs();
return tempDir;
}
+
+ /**
+ * Create a temporary directory which will be removed when the
+ * cluster closes.
+ *
+ * @param dirName the name of the leaf directory
+ * @return the path to the temporary directory which is usually
+ * under the temporary directory structure for this machine
+ */
+
+ public File makeTempDir(final String dirName) {
+ File dir = getTempDir(dirName);
+ tempDirs.add(dir);
+ return dir;
+ }
+
+ /**
+ * Create a temporary data directory which will be removed when the
+ * cluster closes, and register it as a "dfs" name space.
+ *
+ * @param key the name to use for the directory and the name space.
+ * Access the directory as "dfs.<key>".
+ * @param defaultFormat default storage format for the workspace
+ * @return location of the directory which can be used to create
+ * temporary input files
+ */
+
+ public File makeDataDir(String key, String defaultFormat) {
+ File dir = makeTempDir(key);
+ defineWorkspace("dfs", key, dir.getAbsolutePath(), defaultFormat);
+ return dir;
+ }
+
+ public File getDrillTempDir() {
+ return new File(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH));
+ }
+
+ public boolean usesZK() {
+ return usesZk;
+ }
+
+ /**
+ * Returns the directory that holds query profiles. Valid only for an
+ * embedded Drillbit with local cluster coordinator – the normal
+ * case for unit tests.
+ *
+ * @return query profile directory
+ */
+
+ public File getProfileDir() {
+ File baseDir;
+ if (usesZk) {
+ baseDir = new File(config.getString(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT));
+ } else {
+ baseDir = getDrillTempDir();
+ }
+ return new File(baseDir, "profiles");
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
new file mode 100644
index 0000000..a770d3e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.test.LogFixture.LogFixtureBuilder;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import ch.qos.logback.classic.Level;
+
+/**
+ * Example test case using the Drill cluster fixture. Your test case
+ * can be stand-alone (as here) or can inherit from DrillTest if you
+ * want test-by-test messages. Don't use BaseTestQuery, it will attempt
+ * to set up a Drillbit for you, which is not needed here.
+ * <p>
+ * There is nothing magic about running these items as tests, other than
+ * that JUnit makes it very easy to run one test at a time. You can also
+ * just launch the test as a Java program as shown in the <tt>main()</tt>
+ * method at the end of the file.
+ * <p>
+ * Note also that each test sets up its own Drillbit. Of course, if you
+ * have a series of test that all use the same Drilbit configuration,
+ * you can create your cluster fixture in a JUnit <tt>{@literal @}Before</tt>
+ * method, and shut it down in <tt>{@literal @}After</tt> method.
+ * <p>
+ * See {@link org.apache.drill.test.package_info the package overview} for details.
+ */
+
+// Note: Test itself is ignored because this is an example, not a
+// real test.
+
+@Ignore
+public class ExampleTest {
+
+ /**
+ * Example of the simplest possible test case: set up a default
+ * cluster (with one Drillbit), a corresponding client, run a
+ * query and print the results.
+ *
+ * @throws Exception if anything goes wrong
+ */
+
+ @Test
+ public void firstTest() throws Exception {
+ try (ClusterFixture cluster = ClusterFixture.standardCluster();
+ ClientFixture client = cluster.clientFixture()) {
+ client.queryBuilder().sql("SELECT * FROM `cp`.`employee.json` LIMIT 10").printCsv();
+ }
+ }
+
+ /**
+ * Example that uses the fixture builder to build a cluster fixture. Lets
+ * you set configuration (boot-time) options, session options, system options
+ * and more.
+ * <p>
+ * Also shows how to display the plan JSON and just run a query silently,
+ * getting just the row count, batch count and run time.
+ *
+ * @throws Exception if anything goes wrong
+ */
+
+ @Test
+ public void secondTest() throws Exception {
+ FixtureBuilder builder = ClusterFixture.builder()
+ .configProperty(ExecConstants.SLICE_TARGET, 10)
+ ;
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String sql = "SELECT * FROM `cp`.`employee.json` LIMIT 10";
+ System.out.println( client.queryBuilder().sql(sql).explainJson() );
+ QuerySummary results = client.queryBuilder().sql(sql).run();
+ System.out.println(String.format("Read %d rows", results.recordCount()));
+ // Usually we want to test something. Here, just test that we got
+ // the 10 records.
+ assertEquals(10, results.recordCount());
+ }
+ }
+
+ /**
+ * Example test using the SQL mock data source. For now, we support just two
+ * column types:
+ * <ul>
+ * <li>Integer: _i</li>
+ * <li>String (Varchar): _sn, where n is the field width.</li>
+ * </ul>
+ * Row count is encoded in the table name with an optional "K" or "M"
+ * suffix for bigger row count numbers.
+ * <p>
+ * The mock data source is defined automatically by the cluster fixture.
+ * <p>
+ * There is another, more sophisticated, way to generate test data using
+ * a mock data source defined in a JSON file. We'll add an example for
+ * that later.
+ *
+ * @throws Exception if anything goes wrong
+ */
+
+ @Test
+ public void thirdTest() throws Exception {
+ try (ClusterFixture cluster = ClusterFixture.standardCluster();
+ ClientFixture client = cluster.clientFixture()) {
+ String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_5`";
+ client.queryBuilder().sql(sql).printCsv();
+ }
+ }
+
+ /**
+ * Example using custom logging. Here we run a sort with trace logging enabled
+ * for just the sort class, and with logging displayed to the console.
+ * <p>
+ * This example also shows setting up a realistic set of options prior to
+ * running a query. Note that we pass in normal Java values (don't have to
+ * encode the values as a string.)
+ * <p>
+ * Finally, also shows defining your own ad-hoc local file workspace to
+ * point to a sample data file.
+ * <p>
+ * Unlike the other tests, don't actually run this one. It points to
+ * a location on a local machine. And, the query itself takes 23 minutes
+ * to run if you had the right data file...
+ *
+ * @throws Exception if anything goes wrong
+ */
+ @Test
+ public void fourthTest() throws Exception {
+ LogFixtureBuilder logBuilder = LogFixture.builder()
+ // Log to the console for debugging convenience
+ .toConsole()
+ // All debug messages in the xsort package
+ .logger("org.apache.drill.exec.physical.impl.xsort", Level.DEBUG)
+ // And trace messages for one class.
+ .logger(ExternalSortBatch.class, Level.TRACE)
+ ;
+ FixtureBuilder builder = ClusterFixture.builder()
+ // Easy way to run single threaded for easy debugging
+ .maxParallelization(1)
+ // Set some session options
+ .sessionOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 2L * 1024 * 1024 * 1024)
+ .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
+ .sessionOption(PlannerSettings.HASHAGG.getOptionName(), false)
+ ;
+
+ try (LogFixture logs = logBuilder.build();
+ ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ setupFile();
+ cluster.defineWorkspace("dfs", "data", "/tmp/drill-test", "psv");
+ String sql = "select * from `dfs.data`.`example.tbl` order by columns[0]";
+ QuerySummary results = client.queryBuilder().sql(sql).run();
+ assertEquals( 2, results.recordCount() );
+ }
+ }
+
+ // Create a local file that can be pointed to by the dfs.data plugin config.
+ // In real life, you would already have the file and not need to create it
+ // here.
+
+ private void setupFile() {
+ File destFile = new File( "/tmp/drill-test/example.tbl" );
+ destFile.getParentFile().mkdirs();
+ try (PrintWriter out = new PrintWriter(new FileWriter(destFile))) {
+ out.println("20|def");
+ out.println("10|abc");
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Example of a more realistic test that limits parallization, saves the query
+ * profile, parses it, and displays the runtime timing results per operator.
+ *
+ * @throws Exception if anything goes wrong
+ */
+
+ @Test
+ public void fifthTest() throws Exception {
+ FixtureBuilder builder = ClusterFixture.builder()
+ .maxParallelization(1)
+ .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
+ ;
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_10K` ORDER BY id_i";
+
+ QuerySummary summary = client.queryBuilder().sql(sql).run();
+ System.out.println(String.format("Results: %,d records, %d batches, %,d ms", summary.recordCount(), summary.batchCount(), summary.runTimeMs() ) );
+
+ System.out.println("Query ID: " + summary.queryIdString());
+ ProfileParser profile = client.parseProfile(summary.queryIdString());
+ profile.print();
+ }
+ }
+
+ /**
+ * Example of running a specific test as Java program. Handy if you want to
+ * run the test from the command line, or if your test runs so long that JUnit
+ * would kill it with a timeout.
+ * <p>
+ * The key point is that the unit test framework has no dependencies on test
+ * classes, on JUnit annotations, etc.
+ *
+ * @param args not used
+ */
+
+ public static void main(String args) {
+ try {
+ new ExampleTest().firstTest();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
index e56f190..b1c0bee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
@@ -17,6 +17,7 @@
******************************************************************************/
package org.apache.drill.test;
+import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -55,15 +56,17 @@ public class FixtureBuilder {
return props;
}
- String configResource;
- Properties configProps;
- boolean enableFullCache;
- List<RuntimeOption> sessionOptions;
- List<RuntimeOption> systemOptions;
- int bitCount = 1;
- String bitNames[];
- int zkCount;
- ZookeeperHelper zkHelper;
+ protected String configResource;
+ protected Properties configProps;
+ protected List<RuntimeOption> sessionOptions;
+ protected List<RuntimeOption> systemOptions;
+ protected int bitCount = 1;
+ protected String bitNames[];
+ protected int localZkCount;
+ protected ZookeeperHelper zkHelper;
+ protected boolean usingZk;
+ protected File tempDir;
+ protected boolean preserveLocalFiles;
/**
* Use the given configuration properties to start the embedded Drillbit.
@@ -165,11 +168,6 @@ public class FixtureBuilder {
return sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, n);
}
- public FixtureBuilder enableFullCache() {
- enableFullCache = true;
- return this;
- }
-
/**
* The number of Drillbits to start in the cluster.
*
@@ -201,17 +199,22 @@ public class FixtureBuilder {
* Drillbits.
* @return this builder
*/
- public FixtureBuilder withZk() {
- return withZk(1);
+ public FixtureBuilder withLocalZk() {
+ return withLocalZk(1);
}
- public FixtureBuilder withZk(int count) {
- zkCount = count;
+ public FixtureBuilder withLocalZk(int count) {
+ localZkCount = count;
+ usingZk = true;
// Using ZK. Turn refresh wait back on.
- configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH);
- return this;
+ return configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH);
+ }
+
+ public FixtureBuilder withRemoteZk(String connStr) {
+ usingZk = true;
+ return configProperty(ExecConstants.ZK_CONNECTION, connStr);
}
/**
@@ -224,6 +227,7 @@ public class FixtureBuilder {
*/
public FixtureBuilder withZk(ZookeeperHelper zk) {
zkHelper = zk;
+ usingZk = true;
// Using ZK. Turn refresh wait back on.
@@ -231,6 +235,50 @@ public class FixtureBuilder {
return this;
}
+ public FixtureBuilder tempDir(File path) {
+ this.tempDir = path;
+ return this;
+ }
+
+ /**
+ * Enable saving of query profiles. The only way to save them is
+ * to enable local store provider writes, which also saves the
+ * storage plugin configs. Doing so causes the CTTAS feature to
+ * fail on the next run, so the test fixture deletes all local
+ * files on start and close, unless
+ * {@link #keepLocalFiles()} is set.
+ *
+ * @return this builder
+ */
+
+ public FixtureBuilder saveProfiles() {
+ configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true);
+// configProperty(ExecConstants.QUERY_PROFILE_OPTION, "sync") // Temporary until DRILL-5257 is available
+ return this;
+ }
+
+ /**
+ * Starting with the addition of the CTTAS feature, a Drillbit will
+ * not restart unless we delete all local storage files before
+ * starting the Drillbit again. In particular, the stored copies
+ * of the storage plugin configs cause the temporary workspace
+ * check to fail. Normally the cluster fixture cleans up files
+ * both before starting and after shutting down the cluster. Set this
+ * option to preserve files after shutdown, perhaps to debug the
+ * contents.
+ * <p>
+ * This clean-up is needed only if we enable local storage writes
+ * (which we must do, unfortunately, to capture and analyze
+ * storage profiles.)
+ *
+ * @return this builder
+ */
+
+ public FixtureBuilder keepLocalFiles() {
+ preserveLocalFiles = true;
+ return this;
+ }
+
/**
* Create the embedded Drillbit and client, applying the options set
* in the builder. Best to use this in a try-with-resources block:
@@ -252,9 +300,9 @@ public class FixtureBuilder {
* need them.
*
* @return
- * @throws Exception
*/
- public ClusterFixture build() throws Exception {
+
+ public ClusterFixture build() {
return new ClusterFixture(this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
index d2242a1..b54b0b0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
@@ -48,7 +48,7 @@ import ch.qos.logback.core.ConsoleAppender;
* }
* }</code></pre>
* <p>
- * You can – and should – combine the log fixtue with the
+ * You can – and should – combine the log fixture with the
* cluster and client fixtures to have complete control over your test-time
* Drill environment.
*/
@@ -194,7 +194,7 @@ public class LogFixture implements AutoCloseable {
}
private void setupConsole(LogFixtureBuilder builder) {
- Logger drillLogger = (Logger)LoggerFactory.getLogger(DRILL_PACKAGE_NAME);
+ drillLogger = (Logger)LoggerFactory.getLogger(DRILL_PACKAGE_NAME);
if (drillLogger.getAppender("STDOUT") != null) {
return;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
index f9df768..6c99d8d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
@@ -21,6 +21,8 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -34,6 +36,10 @@ import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonValue;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+
+import com.google.common.base.Preconditions;
+
/**
* Parses a query profile and provides access to various bits of the profile
* for diagnostic purposes during tests.
@@ -42,44 +48,219 @@ import javax.json.JsonValue;
public class ProfileParser {
JsonObject profile;
+ String query;
List<String> plans;
+ /**
+ * Operations sorted by operator ID. The Operator ID serves as
+ * an index into the list to get the information for that operator.
+ */
+ List<OpDefInfo> operations;
+ Map<Integer,FragInfo> fragments = new HashMap<>();
+
+ /**
+ * Operations in the original topological order as shown in the text
+ * version of the query plan in the query profile.
+ */
+ private List<OpDefInfo> topoOrder;
+
public ProfileParser( File file ) throws IOException {
try (FileReader fileReader = new FileReader(file);
JsonReader reader = Json.createReader(fileReader)) {
profile = (JsonObject) reader.read();
}
+
+ parse();
+ }
+
+ private void parse() {
+ parseQuery();
+ parsePlans();
+ buildFrags();
+ parseFragProfiles();
+ mapOpProfiles();
+ aggregateOpers();
+ buildTree();
+ }
+
+ private void parseQuery() {
+ query = profile.getString("query");
+ query = query.replace("//n", "\n");
+ }
+
+ /**
+ * Parse a text version of the plan as it appears in the JSON
+ * query profile.
+ */
+
+ private static class PlanParser {
+
+ List<String> plans = new ArrayList<>();
+ List<OpDefInfo> operations = new ArrayList<>();
+ List<OpDefInfo> sorted = new ArrayList<>();
+
+ public void parsePlans(String plan) {
+ plans = new ArrayList<>( );
+ String parts[] = plan.split("\n");
+ for (String part : parts) {
+ plans.add(part);
+ OpDefInfo opDef = new OpDefInfo( part );
+ operations.add(opDef);
+ }
+ sortList();
+ }
+
+ private void sortList() {
+ List<OpDefInfo> raw = new ArrayList<>( );
+ raw.addAll( operations );
+ Collections.sort( raw, new Comparator<OpDefInfo>() {
+ @Override
+ public int compare(OpDefInfo o1, OpDefInfo o2) {
+ int result = Integer.compare(o1.majorId, o2.majorId);
+ if ( result == 0 ) {
+ result = Integer.compare(o1.stepId, o2.stepId);
+ }
+ return result;
+ }
+ });
+ int currentFrag = 0;
+ int currentStep = 0;
+ for ( OpDefInfo opDef : raw ) {
+ if ( currentFrag < opDef.majorId ) {
+ currentFrag++;
+ OpDefInfo sender = new OpDefInfo( currentFrag, 0 );
+ sender.isInferred = true;
+ sender.name = "Sender";
+ sorted.add(sender);
+ currentStep = 1;
+ opDef.inferredParent = sender;
+ sender.children.add( opDef );
+ }
+ if ( opDef.stepId > currentStep ) {
+ OpDefInfo unknown = new OpDefInfo( currentFrag, currentStep );
+ unknown.isInferred = true;
+ unknown.name = "Unknown";
+ sorted.add(unknown);
+ opDef.inferredParent = unknown;
+ unknown.children.add( opDef );
+ }
+ sorted.add( opDef );
+ currentStep = opDef.stepId + 1;
+ }
+ }
}
+ /**
+ * Parse the plan portion of the query profile.
+ */
+
+ private void parsePlans() {
+ PlanParser parser = new PlanParser();
+ String plan = getPlan( );
+ parser.parsePlans(plan);
+ plans = parser.plans;
+ topoOrder = parser.operations;
+ operations = parser.sorted;
+ }
+
+ private void buildFrags() {
+ for (OpDefInfo opDef : operations) {
+ FragInfo major = fragments.get(opDef.majorId);
+ if (major == null) {
+ major = new FragInfo(opDef.majorId);
+ fragments.put(opDef.majorId, major);
+ }
+ major.ops.add(opDef);
+ }
+ }
+
+ private static List<FieldDef> parseCols(String cols) {
+ String parts[] = cols.split( ", " );
+ List<FieldDef> fields = new ArrayList<>( );
+ for ( String part : parts ) {
+ String halves[] = part.split( " " );
+ fields.add( new FieldDef( halves[1], halves[0] ) );
+ }
+ return fields;
+ }
+
+ private void parseFragProfiles() {
+ JsonArray frags = getFragmentProfile( );
+ for (JsonObject fragProfile : frags.getValuesAs(JsonObject.class)) {
+ int mId = fragProfile.getInt("majorFragmentId");
+ FragInfo major = fragments.get(mId);
+ major.parse(fragProfile);
+ }
+ }
+
+ private void mapOpProfiles() {
+ for (FragInfo major : fragments.values()) {
+ for (MinorFragInfo minor : major.minors) {
+ minor.mapOpProfiles(major);
+ }
+ }
+ }
+
+ /**
+ * A typical plan has many operator details across multiple
+ * minor fragments. Aggregate these totals to the "master"
+ * definition of each operator.
+ */
+
+ private void aggregateOpers() {
+ for (FragInfo major : fragments.values()) {
+ for (OpDefInfo opDef : major.ops) {
+ int sumPeak = 0;
+ for ( OperatorProfile op : opDef.opExecs) {
+ Preconditions.checkState( major.id == op.majorFragId );
+ Preconditions.checkState( opDef.stepId == op.opId );
+ opDef.actualRows += op.records;
+ opDef.actualBatches += op.batches;
+ sumPeak += op.peakMem;
+ }
+ opDef.actualMemory = sumPeak * 1024 * 1024;
+ }
+ }
+ }
+
+ /**
+ * Reconstruct the operator tree from parsed information.
+ */
+
+ public void buildTree() {
+ int currentLevel = 0;
+ OpDefInfo opStack[] = new OpDefInfo[topoOrder.size()];
+ for (OpDefInfo opDef : topoOrder) {
+ currentLevel = opDef.globalLevel;
+ opStack[currentLevel] = opDef;
+ if ( opDef.inferredParent == null ) {
+ if (currentLevel > 0) {
+ opStack[currentLevel-1].children.add(opDef);
+ }
+ } else {
+ opStack[currentLevel-1].children.add(opDef.inferredParent);
+ }
+ }
+ }
+
+
public String getQuery( ) {
- return profile.get("query").toString();
+ return profile.getString("query");
}
public String getPlan() {
- return profile.get("plan").toString();
+ return profile.getString("plan");
}
public List<String> getPlans() {
- if ( plans != null ) {
- return plans; }
- String plan = getPlan( );
- Pattern p = Pattern.compile( "(\\d\\d-\\d+[^\\\\]*)\\\\n", Pattern.MULTILINE );
- Matcher m = p.matcher(plan);
- plans = new ArrayList<>( );
- while ( m.find() ) {
- plans.add(m.group(1));
- }
return plans;
}
public List<String> getScans( ) {
List<String> scans = new ArrayList<>();
int n = getPlans( ).size();
-// Pattern p = Pattern.compile( "\\d+-\\d+\\s+(\\w+)\\(" );
for ( int i = n-1; i >= 0; i-- ) {
String plan = plans.get( i );
-// Matcher m = p.matcher( plan );
-// if ( ! m.find() ) { continue; }
if ( plan.contains( " Scan(" ) ) {
scans.add( plan );
}
@@ -120,15 +301,122 @@ public class ProfileParser {
return profile.getJsonArray("fragmentProfile");
}
- public static class OpInfo {
- int opId;
- int type;
- String name;
- long processMs;
- long waitMs;
- long setupMs;
- long peakMem;
- Map<Integer,JsonValue> metrics = new HashMap<>();
+ /**
+ * Information for a fragment, including the operators
+ * in that fragment and the set of minor fragments.
+ */
+
+ public static class FragInfo {
+ public int baseLevel;
+ public int id;
+ public List<OpDefInfo> ops = new ArrayList<>( );
+ public List<MinorFragInfo> minors = new ArrayList<>( );
+
+ public FragInfo(int majorId) {
+ this.id = majorId;
+ }
+
+ public OpDefInfo getRootOperator() {
+ return ops.get(0);
+ }
+
+ public void parse(JsonObject fragProfile) {
+ JsonArray minorList = fragProfile.getJsonArray("minorFragmentProfile");
+ for ( JsonObject minorProfile : minorList.getValuesAs(JsonObject.class) ) {
+ minors.add( new MinorFragInfo(id, minorProfile) );
+ }
+ }
+ }
+
+ /**
+ * Information about a minor fragment as parsed from the profile.
+ */
+
+ public static class MinorFragInfo {
+ public final int majorId;
+ public final int id;
+ public final List<OperatorProfile> ops = new ArrayList<>( );
+
+ public MinorFragInfo(int majorId, JsonObject minorProfile) {
+ this.majorId = majorId;
+ id = minorProfile.getInt("minorFragmentId");
+ JsonArray opList = minorProfile.getJsonArray("operatorProfile");
+ for ( JsonObject opProfile : opList.getValuesAs(JsonObject.class)) {
+ ops.add( new OperatorProfile( majorId, id, opProfile) );
+ }
+ }
+
+ public void mapOpProfiles(FragInfo major) {
+ for (OperatorProfile op : ops) {
+ OpDefInfo opDef = major.ops.get(op.opId);
+ if ( opDef == null ) {
+ System.out.println( "Can't find operator def: " + major.id + "-" + op.opId);
+ continue;
+ }
+ op.opName = CoreOperatorType.valueOf(op.type).name();
+// System.out.println( major.id + "-" + id + "-" + opDef.stepId + " - Def: " + opDef.name + " / Prof: " + op.opName );
+ op.opName = op.opName.replace("_", " ");
+ op.name = opDef.name;
+ if (op.name.equalsIgnoreCase(op.opName)) {
+ op.opName = null;
+ }
+ op.defn = opDef;
+ opDef.opName = op.opName;
+ opDef.opExecs.add(op);
+ }
+ }
+
+ }
+
+ /**
+ * Detailed information about each operator within a minor fragment
+ * for a major fragment. Gathers the detailed information from
+ * the profile.
+ */
+
+ public static class OperatorProfile {
+ public OpDefInfo defn;
+ public String opName;
+ public int majorFragId;
+ public int minorFragId;
+ public int opId;
+ public int type;
+ public String name;
+ public long processMs;
+ public long waitMs;
+ public long setupMs;
+ public long peakMem;
+ public Map<Integer,JsonNumber> metrics = new HashMap<>();
+ public long records;
+ public int batches;
+ public int schemas;
+
+ public OperatorProfile(int majorId, int minorId, JsonObject opProfile) {
+ majorFragId = majorId;
+ minorFragId = minorId;
+ opId = opProfile.getInt("operatorId");
+ type = opProfile.getInt("operatorType");
+ processMs = opProfile.getJsonNumber("processNanos").longValue() / 1_000_000;
+ waitMs = opProfile.getJsonNumber("waitNanos").longValue() / 1_000_000;
+ setupMs = opProfile.getJsonNumber("setupNanos").longValue() / 1_000_000;
+ peakMem = opProfile.getJsonNumber("peakLocalMemoryAllocated").longValue() / (1024 * 1024);
+ JsonArray array = opProfile.getJsonArray("inputProfile");
+ if (array != null) {
+ for (int i = 0; i < array.size(); i++) {
+ JsonObject obj = array.getJsonObject(i);
+ records += obj.getJsonNumber("records").longValue();
+ batches += obj.getInt("batches");
+ schemas += obj.getInt("schemas");
+ }
+ }
+ array = opProfile.getJsonArray("metric");
+ if (array != null) {
+ for (int i = 0; i < array.size(); i++) {
+ JsonObject metric = array.getJsonObject(i);
+ metrics.put(metric.getJsonNumber("metricId").intValue(), metric.getJsonNumber("longValue"));
+ }
+ }
+ }
public long getMetric(int id) {
JsonValue value = metrics.get(id);
@@ -138,9 +426,226 @@ public class ProfileParser {
}
}
- public Map<Integer,OpInfo> getOpInfo( ) {
+ /**
+ * Information about an operator definition: the plan-time information
+ * that appears in the plan portion of the profile. Also holds the
+ * "actuals" from the minor fragment portion of the profile.
+ * Allows integrating the "planned" vs. "actual" performance of the
+ * query.
+ */
+
+ public static class OpDefInfo {
+ public String opName;
+ public boolean isInferred;
+ public int majorId;
+ public int stepId;
+ public String args;
+ public List<FieldDef> columns;
+ public int globalLevel;
+ public int localLevel;
+ public int id;
+ public int branchId;
+ public boolean isBranchRoot;
+ public double estMemoryCost;
+ public double estNetCost;
+ public double estIOCost;
+ public double estCpuCost;
+ public double estRowCost;
+ public double estRows;
+ public String name;
+ public long actualMemory;
+ public int actualBatches;
+ public long actualRows;
+ public OpDefInfo inferredParent;
+ public List<OperatorProfile> opExecs = new ArrayList<>( );
+ public List<OpDefInfo> children = new ArrayList<>( );
+
+ // 00-00 Screen : rowType = RecordType(VARCHAR(10) Year, VARCHAR(65536) Month, VARCHAR(100) Devices, VARCHAR(100) Tier, VARCHAR(100) LOB, CHAR(10) Gateway, BIGINT Day, BIGINT Hour, INTEGER Week, VARCHAR(100) Week_end_date, BIGINT Usage_Cnt): \
+ // rowcount = 100.0, cumulative cost = {7.42124276972414E9 rows, 7.663067406383167E10 cpu, 0.0 io, 2.24645048816E10 network, 2.692766612982188E8 memory}, id = 129302
+ //
+ // 00-01 Project(Year=[$0], Month=[$1], Devices=[$2], Tier=[$3], LOB=[$4], Gateway=[$5], Day=[$6], Hour=[$7], Week=[$8], Week_end_date=[$9], Usage_Cnt=[$10]) :
+ // rowType = RecordType(VARCHAR(10) Year, VARCHAR(65536) Month, VARCHAR(100) Devices, VARCHAR(100) Tier, VARCHAR(100) LOB, CHAR(10) Gateway, BIGINT Day, BIGINT Hour, INTEGER Week, VARCHAR(100) Week_end_date, BIGINT Usage_Cnt): rowcount = 100.0, cumulative cost = {7.42124275972414E9 rows, 7.663067405383167E10 cpu, 0.0 io, 2.24645048816E10 network, 2.692766612982188E8 memory}, id = 129301
+
+ public OpDefInfo(String plan) {
+ Pattern p = Pattern.compile( "^(\\d+)-(\\d+)(\\s+)(\\w+)(?:\\((.*)\\))?\\s*:\\s*(.*)$" );
+ Matcher m = p.matcher(plan);
+ if (!m.matches()) {
+ throw new IllegalStateException( "Could not parse plan: " + plan );
+ }
+ majorId = Integer.parseInt(m.group(1));
+ stepId = Integer.parseInt(m.group(2));
+ name = m.group(4);
+ args = m.group(5);
+ String tail = m.group(6);
+ String indent = m.group(3);
+ globalLevel = (indent.length() - 4) / 2;
+
+ p = Pattern.compile("rowType = RecordType\\((.*)\\): (rowcount .*)");
+ m = p.matcher(tail);
+ if ( m.matches() ) {
+ columns = parseCols(m.group(1));
+ tail = m.group(2);
+ }
+
+ p = Pattern.compile( "rowcount = ([\\d.E]+), cumulative cost = \\{([\\d.E]+) rows, ([\\d.E]+) cpu, ([\\d.E]+) io, ([\\d.E]+) network, ([\\d.E]+) memory\\}, id = (\\d+)");
+ m = p.matcher(tail);
+ if (! m.matches()) {
+ throw new IllegalStateException("Could not parse costs: " + tail );
+ }
+ estRows = Double.parseDouble(m.group(1));
+ estRowCost = Double.parseDouble(m.group(2));
+ estCpuCost = Double.parseDouble(m.group(3));
+ estIOCost = Double.parseDouble(m.group(4));
+ estNetCost = Double.parseDouble(m.group(5));
+ estMemoryCost = Double.parseDouble(m.group(6));
+ id = Integer.parseInt(m.group(7));
+ }
+
+ public void printTree(String indent) {
+ new TreePrinter().visit(this);
+ }
+
+ public OpDefInfo(int major, int id) {
+ majorId = major;
+ stepId = id;
+ }
+
+ @Override
+ public String toString() {
+ String head = "[OpDefInfo " + majorId + "-" + stepId + ": " + name;
+ if ( isInferred ) {
+ head += " (" + opName + ")";
+ }
+ return head + "]";
+ }
+ }
+
+ /**
+ * Visit a tree of operator definitions to support printing,
+ * analysis and other tasks.
+ */
+
+ public static class TreeVisitor
+ {
+ public void visit(OpDefInfo root) {
+ visit(root, 0);
+ }
+ public void visit(OpDefInfo node, int indent) {
+ visitOp( node, indent );
+ if (node.children.isEmpty()) {
+ return;
+ }
+ if ( node.children.size() == 1) {
+ visit(node.children.get(0), indent);
+ return;
+ }
+ indent++;
+ int i = 0;
+ for (OpDefInfo child : node.children) {
+ visitSubtree(node, i++, indent);
+ visit(child, indent+1);
+ }
+ }
+
+ protected void visitOp(OpDefInfo node, int indent) {
+ }
+
+ protected void visitSubtree(OpDefInfo node, int i, int indent) {
+ }
+
+ public String indentString(int indent, String pad) {
+ StringBuilder buf = new StringBuilder();
+ for (int i = 0; i < indent; i++) {
+ buf.append( pad );
+ }
+ return buf.toString();
+ }
+
+ public String indentString(int indent) {
+ return indentString(indent, " ");
+ }
+
+ public String subtreeLabel(OpDefInfo node, int branch) {
+ if (node.name.equals("HashJoin")) {
+ return (branch == 0) ? "Probe" : "Build";
+ } else {
+ return "Input " + (branch + 1);
+ }
+ }
+ }
+
+ /**
+ * Print the operator tree for analysis.
+ */
+
+ public static class TreePrinter extends TreeVisitor
+ {
+ @Override
+ protected void visitOp(OpDefInfo node, int indent) {
+ System.out.print( indentString(indent) );
+ System.out.println( node.toString() );
+ }
+
+ @Override
+ protected void visitSubtree(OpDefInfo node, int i, int indent) {
+ System.out.print( indentString(indent) );
+ System.out.println(subtreeLabel(node, i));
+ }
+ }
+
+ /**
+ * Print out the tree showing a comparison of estimated vs.
+ * actual costs. Example:
+ * <p><pre>
+ * 03-05 HashJoin (HASH JOIN)
+ * Estimate: 2,521,812 rows, 1 MB
+ * Actual: 116,480 rows, 52 MB
+ * Probe
+ * 03-07 . . Project
+ * Estimate: 2,521,812 rows, 1 MB
+ * Actual: 0 rows, 0 MB
+ * </pre>
+ */
+
+ public static class CostPrinter extends TreeVisitor
+ {
+ @Override
+ protected void visitOp(OpDefInfo node, int indentLevel) {
+ System.out.print(String.format("%02d-%02d ", node.majorId, node.stepId));
+ String indent = indentString(indentLevel, ". ");
+ System.out.print( indent + node.name );
+ if (node.opName != null) {
+ System.out.print( " (" + node.opName + ")" );
+ }
+ System.out.println( );
+ indent = indentString(15);
+ System.out.print( indent );
+ System.out.println(String.format(" Estimate: %,15.0f rows, %,7.0f MB",
+ node.estRows, node.estMemoryCost / 1024 / 1024) );
+ System.out.print( indent );
+ System.out.println(String.format(" Actual: %,15d rows, %,7d MB",
+ node.actualRows, node.actualMemory / 1024 / 1024));
+ }
+
+ @Override
+ protected void visitSubtree(OpDefInfo node, int i, int indent) {
+ System.out.print( indentString(indent) + " " );
+ System.out.println(subtreeLabel(node, i));
+ }
+ }
+
+ /**
+ * We often run test queries single threaded to make analysis of the profile
+ * easier. For a single-threaded (single slice) query, get a map from
+ * operator ID to operator information as preparation for additional
+ * analysis.
+ *
+ * @return
+ */
+
+ public Map<Integer,OperatorProfile> getOpInfo( ) {
Map<Integer,String> ops = getOperators( );
- Map<Integer,OpInfo> info = new HashMap<>( );
+ Map<Integer,OperatorProfile> info = new HashMap<>( );
JsonArray frags = getFragmentProfile( );
JsonObject fragProfile = frags.getJsonObject(0).getJsonArray("minorFragmentProfile").getJsonObject(0);
JsonArray opList = fragProfile.getJsonArray("operatorProfile");
@@ -150,40 +655,56 @@ public class ProfileParser {
return info;
}
- private void parseOpProfile(Map<Integer, String> ops,
- Map<Integer, OpInfo> info, JsonObject opProfile) {
- OpInfo opInfo = new OpInfo( );
- opInfo.opId = opProfile.getInt("operatorId");
- opInfo.type = opProfile.getInt("operatorType");
- opInfo.name = ops.get(opInfo.opId);
- opInfo.processMs = opProfile.getJsonNumber("processNanos").longValue() / 1_000_000;
- opInfo.waitMs = opProfile.getJsonNumber("waitNanos").longValue() / 1_000_000;
- opInfo.setupMs = opProfile.getJsonNumber("setupNanos").longValue() / 1_000_000;
- opInfo.peakMem = opProfile.getJsonNumber("peakLocalMemoryAllocated").longValue() / (1024 * 1024);
- JsonArray array = opProfile.getJsonArray("metric");
- if (array != null) {
- for (int i = 0; i < array.size(); i++) {
- JsonObject metric = array.getJsonObject(i);
- opInfo.metrics.put(metric.getJsonNumber("metricId").intValue(), metric.get("longValue"));
+ /**
+ * For a single-slice query, get all operators of a given numeric operator
+ * type.
+ * @param type the operator type as specified in
+ * {@link org.apache.drill.exec.proto.UserBitShared.CoreOperatorType}
+ * @return a list of operators of the given type
+ */
+
+ public List<OperatorProfile> getOpsOfType(int type) {
+ List<OperatorProfile> ops = new ArrayList<>();
+ Map<Integer,OperatorProfile> opMap = getOpInfo();
+ for (OperatorProfile op : opMap.values()) {
+ if (op.type == type) {
+ ops.add(op);
}
}
+ return ops;
+ }
+
+ private void parseOpProfile(Map<Integer, String> ops,
+ Map<Integer, OperatorProfile> info, JsonObject opProfile) {
+ OperatorProfile opInfo = new OperatorProfile( 0, 0, opProfile );
+ opInfo.name = ops.get(opInfo.opId);
info.put(opInfo.opId, opInfo);
}
+ public void printPlan() {
+ new CostPrinter().visit( topoOrder.get(0) );
+ }
+
+ /**
+ * For a single-slice query, print a summary of the operator stack
+ * and costs. At present, works for a linear query with on single-input
+ * operators.
+ */
+
public void print() {
- Map<Integer, OpInfo> opInfo = getOpInfo();
+ Map<Integer, OperatorProfile> opInfo = getOpInfo();
int n = opInfo.size();
long totalSetup = 0;
long totalProcess = 0;
for ( int i = 0; i <= n; i++ ) {
- OpInfo op = opInfo.get(i);
+ OperatorProfile op = opInfo.get(i);
if ( op == null ) { continue; }
totalSetup += op.setupMs;
totalProcess += op.processMs;
}
long total = totalSetup + totalProcess;
for ( int i = 0; i <= n; i++ ) {
- OpInfo op = opInfo.get(i);
+ OperatorProfile op = opInfo.get(i);
if ( op == null ) { continue; }
System.out.print( "Op: " );
System.out.print( op.opId );
@@ -216,4 +737,13 @@ public class ProfileParser {
return Math.round(value * 100 / total );
}
+ public List<OpDefInfo> getOpDefn(String target) {
+ List<OpDefInfo> ops = new ArrayList<>( );
+ for ( OpDefInfo opDef : operations ) {
+ if ( opDef.name.startsWith( target ) ) {
+ ops.add( opDef );
+ }
+ }
+ return ops;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 084c28a..f9837ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -63,6 +63,11 @@ public class QueryBuilder {
public class SummaryOnlyQueryEventListener implements UserResultsListener {
+ /**
+ * The future to be notified. Created here and returned by the
+ * query builder.
+ */
+
private final QuerySummaryFuture future;
private QueryId queryId;
private int recordCount;
@@ -326,14 +331,13 @@ public class QueryBuilder {
* @return the number of rows returned
* @throws Exception if anything goes wrong with query execution
*/
+
public long print() throws Exception {
DrillConfig config = client.cluster().config( );
- // Note: verbose check disabled until that change is
- // committed.
- boolean verbose = ! config.getBoolean(QueryTestUtil.TEST_QUERY_PRINTING_SILENT) /* ||
- DrillTest.verbose() */;
+ boolean verbose = ! config.getBoolean(QueryTestUtil.TEST_QUERY_PRINTING_SILENT) ||
+ DrillTest.verbose();
if (verbose) {
return print(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
} else {
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/exec/jdbc-all/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index f9293e6..4610a92 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -314,6 +314,7 @@
<exclude>org.mockito:mockito-core</exclude>
<exclude>org.objenesis:objenesis</exclude>
<exclude>org.eclipse.jetty:*</exclude>
+ <exclude>commons-io:commons-io</exclude>
</excludes>
</artifactSet>
<relocations>
@@ -431,7 +432,7 @@
</filters>
</configuration>
</plugin>
-
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
@@ -446,11 +447,11 @@
<rules>
<requireFilesSize>
<message>
-
- The file drill-jdbc-all-${project.version}.jar is outside the expected size range.
-
+
+ The file drill-jdbc-all-${project.version}.jar is outside the expected size range.
+
This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
-
+
</message>
<maxsize>21000000</maxsize>
<minsize>15000000</minsize>
http://git-wip-us.apache.org/repos/asf/drill/blob/6be287df/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0f7b54e..d890c2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -576,6 +576,12 @@
<version>${dep.slf4j.version}</version>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ </dependency>
+
<!-- Test Dependencies -->
<dependency>
<!-- JMockit needs to be on class path before JUnit. -->