You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2019/08/12 17:28:58 UTC
[drill] 03/04: DRILL-6961: Handle exceptions during queries to
information_schema
This is an automated email from the ASF dual-hosted git repository.
bohdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit cb6de5a5c2ff4928ce978c0c505ad1ec1d793162
Author: Anton Gozhiy <an...@gmail.com>
AuthorDate: Wed Jul 10 19:59:15 2019 +0300
DRILL-6961: Handle exceptions during queries to information_schema
closes #1833
---
.../exec/store/jdbc/TestJdbcPluginWithMySQLIT.java | 6 ++
.../src/test/resources/mysql-test-data.sql | 2 +
.../store/kafka/schema/KafkaMessageSchema.java | 11 ++-
.../drill/exec/store/kafka/KafkaQueriesTest.java | 6 ++
.../openTSDB/client/services/ServiceImpl.java | 8 +--
.../drill/store/openTSDB/TestOpenTSDBPlugin.java | 80 +++++++++++++++-------
.../exec/store/StoragePluginRegistryImpl.java | 6 +-
.../store/ischema/InfoSchemaRecordGenerator.java | 3 +-
8 files changed, 86 insertions(+), 36 deletions(-)
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
index cd6b4b8..0ff6894 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
@@ -323,4 +323,10 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest {
.baselineValuesForSingleColumn(1, 2, 3, 5)
.go();
}
+
+ @Test
+ public void testInformationSchemaViews() throws Exception {
+ String query = "select * from information_schema.`views`";
+ run(query);
+ }
}
diff --git a/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql b/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql
index 9a180e0..b1af4d1 100644
--- a/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql
+++ b/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql
@@ -91,3 +91,5 @@ insert into person (first_name, last_name, address, city, state, zip, bigint_fie
'ZZZ');
insert into person (person_id) values (5);
+
+create view person_view as select * from person;
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
index 6ab826f..034927a 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
@@ -17,12 +17,12 @@
*/
package org.apache.drill.exec.store.kafka.schema;
+import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
-import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.store.AbstractSchema;
@@ -30,7 +30,6 @@ import org.apache.drill.exec.store.kafka.KafkaScanSpec;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +44,7 @@ public class KafkaMessageSchema extends AbstractSchema {
private Set<String> tableNames;
public KafkaMessageSchema(final KafkaStoragePlugin plugin, final String name) {
- super(ImmutableList.<String> of(), name);
+ super(ImmutableList.of(), name);
this.plugin = plugin;
}
@@ -76,9 +75,9 @@ public class KafkaMessageSchema extends AbstractSchema {
if (tableNames == null) {
try (KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps())) {
tableNames = kafkaConsumer.listTopics().keySet();
- } catch(KafkaException e) {
- throw UserException.dataReadError(e).message("Failed to get tables information").addContext(e.getMessage())
- .build(logger);
+ } catch (Exception e) {
+ logger.warn("Failure while loading table names for database '{}': {}", getName(), e.getMessage(), e.getCause());
+ return Collections.emptySet();
}
}
return tableNames;
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index 51bb91d..6dc1b3e 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -74,6 +74,12 @@ public class KafkaQueriesTest extends KafkaTestBase {
.baselineValues(endOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))-1).go();
}
+ @Test
+ public void testInformationSchema() throws Exception {
+ String query = "select * from information_schema.`views`";
+ runSQL(query);
+ }
+
private Map<TopicPartition, Long> fetchOffsets(int flag) {
KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(),
new ByteArrayDeserializer(), new ByteArrayDeserializer());
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java
index 41730bd..40ed682 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java
@@ -33,6 +33,7 @@ import retrofit2.converter.jackson.JacksonConverterFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -101,10 +102,9 @@ public class ServiceImpl implements Service {
try {
return client.getAllTablesName().execute().body();
} catch (IOException e) {
- throw UserException.connectionError(e)
- .message("Cannot connect to the db. " +
- "Maybe you have incorrect connection params or db unavailable now")
- .build(log);
+ log.warn("Cannot connect to the db. " +
+ "Maybe you have incorrect connection params or db unavailable now: {}", e.getMessage(), e.getCause());
+ return Collections.emptySet();
}
}
diff --git a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
index bcff4d6..4fe9cc2 100644
--- a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
+++ b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
@@ -18,12 +18,13 @@
package org.apache.drill.store.openTSDB;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
-import org.apache.drill.PlanTestBase;
import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.openTSDB.OpenTSDBStoragePluginConfig;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.QueryTestUtil;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -48,8 +49,9 @@ import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POS
import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS;
import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS;
import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_REQUEST_WITH_TAGS;
+import static org.junit.Assert.assertEquals;
-public class TestOpenTSDBPlugin extends PlanTestBase {
+public class TestOpenTSDBPlugin extends ClusterTest {
private static int portNumber;
@@ -58,8 +60,9 @@ public class TestOpenTSDBPlugin extends PlanTestBase {
@BeforeClass
public static void setup() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
portNumber = QueryTestUtil.getFreePortNumber(10_000, 200);
- final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+ final StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
OpenTSDBStoragePluginConfig storagePluginConfig =
new OpenTSDBStoragePluginConfig(String.format("http://localhost:%s", portNumber));
storagePluginConfig.setEnabled(true);
@@ -109,11 +112,11 @@ public class TestOpenTSDBPlugin extends PlanTestBase {
.withBody(SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITHOUT_TAGS)));
wireMockRule.stubFor(post(urlEqualTo("/api/query"))
- .withRequestBody(equalToJson(END_PARAM_REQUEST_WTIHOUT_TAGS))
- .willReturn(aResponse()
- .withStatus(200)
- .withHeader("Content-Type", "application/json")
- .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS)));
+ .withRequestBody(equalToJson(END_PARAM_REQUEST_WTIHOUT_TAGS))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS)));
wireMockRule.stubFor(post(urlEqualTo("/api/query"))
.withRequestBody(equalToJson(DOWNSAMPLE_REQUEST_WITH_TAGS))
@@ -123,11 +126,11 @@ public class TestOpenTSDBPlugin extends PlanTestBase {
.withBody(SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITH_TAGS)));
wireMockRule.stubFor(post(urlEqualTo("/api/query"))
- .withRequestBody(equalToJson(END_PARAM_REQUEST_WITH_TAGS))
- .willReturn(aResponse()
- .withStatus(200)
- .withHeader("Content-Type", "application/json")
- .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS)));
+ .withRequestBody(equalToJson(END_PARAM_REQUEST_WITH_TAGS))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS)));
wireMockRule.stubFor(post(urlEqualTo("/api/query"))
.withRequestBody(equalToJson(REQUEST_TO_NONEXISTENT_METRIC))
@@ -141,48 +144,77 @@ public class TestOpenTSDBPlugin extends PlanTestBase {
public void testBasicQueryFromWithRequiredParams() throws Exception {
String query =
"select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`";
- Assert.assertEquals(18, testSql(query));
+ assertEquals(18, runQuery(query));
}
@Test
public void testBasicQueryGroupBy() throws Exception {
String query =
"select `timestamp`, sum(`aggregated value`) from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago)` group by `timestamp`";
- Assert.assertEquals(15, testSql(query));
+ assertEquals(15, runQuery(query));
}
@Test
public void testBasicQueryFromWithInterpolationParam() throws Exception {
String query = "select * from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago, downsample=5y-avg)`";
- Assert.assertEquals(4, testSql(query));
+ assertEquals(4, runQuery(query));
}
@Test
public void testBasicQueryFromWithEndParam() throws Exception {
String query = "select * from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago, end=1407165403000))`";
- Assert.assertEquals(5, testSql(query));
+ assertEquals(5, runQuery(query));
}
@Test(expected = UserRemoteException.class)
public void testBasicQueryWithoutTableName() throws Exception {
- test("select * from openTSDB.``;");
+ runQuery("select * from openTSDB.``;");
}
@Test(expected = UserRemoteException.class)
public void testBasicQueryWithNonExistentTableName() throws Exception {
- test("select * from openTSDB.`warp.spee`");
+ runQuery("select * from openTSDB.`warp.spee`");
}
@Test
public void testPhysicalPlanSubmission() throws Exception {
String query = "select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`";
- testPhysicalPlanExecutionBasedOnQuery(query);
+ String plan = queryBuilder()
+ .sql(query)
+ .explainJson();
+ queryBuilder()
+ .query(QueryType.PHYSICAL, plan)
+ .run();
}
@Test
public void testDescribe() throws Exception {
- test("use openTSDB");
- test("describe `warp.speed.test`");
- Assert.assertEquals(1, testSql("show tables"));
+ runQuery("use openTSDB");
+ runQuery("describe `warp.speed.test`");
+ assertEquals(1, runQuery("show tables"));
+ }
+
+ @Test
+ public void testInformationSchemaWrongPluginConfig() throws Exception {
+ ClusterFixture cluster = ClusterFixture.bareBuilder(dirTestWatcher)
+ .build();
+ int portNumber = QueryTestUtil.getFreePortNumber(10_000, 200);
+ final StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
+ OpenTSDBStoragePluginConfig storagePluginConfig =
+ new OpenTSDBStoragePluginConfig(String.format("http://localhost:%s/", portNumber));
+ storagePluginConfig.setEnabled(true);
+ pluginRegistry.createOrUpdate(OpenTSDBStoragePluginConfig.NAME, storagePluginConfig, true);
+ String query = "select * from information_schema.`views`";
+ cluster.clientFixture()
+ .queryBuilder()
+ .sql(query)
+ .run();
+ }
+
+ private long runQuery(String query) throws Exception {
+ return queryBuilder()
+ .sql(query)
+ .run()
+ .recordCount();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 0fbfe4f..2bf6bc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -581,7 +581,11 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
// finally register schemas with the refreshed plugins
for (StoragePlugin plugin : enabledPlugins.plugins()) {
- plugin.registerSchemas(schemaConfig, parent);
+ try {
+ plugin.registerSchemas(schemaConfig, parent);
+ } catch (Exception e) {
+ logger.warn("Error during `{}` schema initialization: {}", plugin.getName(), e.getMessage(), e.getCause());
+ }
}
} catch (ExecutionSetupException e) {
throw new DrillRuntimeException("Failure while updating storage plugins", e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
index a85a7a7..2197f1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -378,8 +378,9 @@ public abstract class InfoSchemaRecordGenerator<S> {
@Override
public boolean visitTable(String schemaName, String tableName, Table table) {
if (table.getJdbcTableType() == TableType.VIEW) {
+ // View's SQL may not be available for some non-Drill views, for example, JDBC view
records.add(new Records.View(IS_CATALOG_NAME, schemaName, tableName,
- ((DrillViewInfoProvider) table).getViewSql()));
+ table instanceof DrillViewInfoProvider ? ((DrillViewInfoProvider) table).getViewSql() : ""));
}
return false;
}