You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2019/08/12 17:28:58 UTC

[drill] 03/04: DRILL-6961: Handle exceptions during queries to information_schema

This is an automated email from the ASF dual-hosted git repository.

bohdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit cb6de5a5c2ff4928ce978c0c505ad1ec1d793162
Author: Anton Gozhiy <an...@gmail.com>
AuthorDate: Wed Jul 10 19:59:15 2019 +0300

    DRILL-6961: Handle exceptions during queries to information_schema
    
    closes #1833
---
 .../exec/store/jdbc/TestJdbcPluginWithMySQLIT.java |  6 ++
 .../src/test/resources/mysql-test-data.sql         |  2 +
 .../store/kafka/schema/KafkaMessageSchema.java     | 11 ++-
 .../drill/exec/store/kafka/KafkaQueriesTest.java   |  6 ++
 .../openTSDB/client/services/ServiceImpl.java      |  8 +--
 .../drill/store/openTSDB/TestOpenTSDBPlugin.java   | 80 +++++++++++++++-------
 .../exec/store/StoragePluginRegistryImpl.java      |  6 +-
 .../store/ischema/InfoSchemaRecordGenerator.java   |  3 +-
 8 files changed, 86 insertions(+), 36 deletions(-)

diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
index cd6b4b8..0ff6894 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java
@@ -323,4 +323,10 @@ public class TestJdbcPluginWithMySQLIT extends ClusterTest {
         .baselineValuesForSingleColumn(1, 2, 3, 5)
         .go();
   }
+
+  @Test
+  public void testInformationSchemaViews() throws Exception {
+    String query = "select * from information_schema.`views`";
+    run(query);
+  }
 }
diff --git a/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql b/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql
index 9a180e0..b1af4d1 100644
--- a/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql
+++ b/contrib/storage-jdbc/src/test/resources/mysql-test-data.sql
@@ -91,3 +91,5 @@ insert into person (first_name, last_name, address, city, state, zip, bigint_fie
             'ZZZ');
 
 insert into person (person_id) values (5);
+
+create view person_view as select * from person;
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
index 6ab826f..034927a 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
@@ -17,12 +17,12 @@
  */
 package org.apache.drill.exec.store.kafka.schema;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.AbstractSchema;
@@ -30,7 +30,6 @@ import org.apache.drill.exec.store.kafka.KafkaScanSpec;
 import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
 import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.KafkaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +44,7 @@ public class KafkaMessageSchema extends AbstractSchema {
   private Set<String> tableNames;
 
   public KafkaMessageSchema(final KafkaStoragePlugin plugin, final String name) {
-    super(ImmutableList.<String> of(), name);
+    super(ImmutableList.of(), name);
     this.plugin = plugin;
   }
 
@@ -76,9 +75,9 @@ public class KafkaMessageSchema extends AbstractSchema {
     if (tableNames == null) {
       try (KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps())) {
         tableNames = kafkaConsumer.listTopics().keySet();
-      } catch(KafkaException e) {
-        throw UserException.dataReadError(e).message("Failed to get tables information").addContext(e.getMessage())
-            .build(logger);
+      } catch (Exception e) {
+        logger.warn("Failure while loading table names for database '{}': {}", getName(), e.getMessage(), e.getCause());
+        return Collections.emptySet();
       }
     }
     return tableNames;
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index 51bb91d..6dc1b3e 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -74,6 +74,12 @@ public class KafkaQueriesTest extends KafkaTestBase {
         .baselineValues(endOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))-1).go();
   }
 
+  @Test
+  public void testInformationSchema() throws Exception {
+    String query = "select * from information_schema.`views`";
+    runSQL(query);
+  }
+
   private Map<TopicPartition, Long> fetchOffsets(int flag) {
     KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(),
         new ByteArrayDeserializer(), new ByteArrayDeserializer());
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java
index 41730bd..40ed682 100644
--- a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java
@@ -33,6 +33,7 @@ import retrofit2.converter.jackson.JacksonConverterFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -101,10 +102,9 @@ public class ServiceImpl implements Service {
     try {
       return client.getAllTablesName().execute().body();
     } catch (IOException e) {
-      throw UserException.connectionError(e)
-              .message("Cannot connect to the db. " +
-                      "Maybe you have incorrect connection params or db unavailable now")
-              .build(log);
+      log.warn("Cannot connect to the db. " +
+          "Maybe you have incorrect connection params or db unavailable now: {}", e.getMessage(), e.getCause());
+      return Collections.emptySet();
     }
   }
 
diff --git a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
index bcff4d6..4fe9cc2 100644
--- a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
+++ b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java
@@ -18,12 +18,13 @@
 package org.apache.drill.store.openTSDB;
 
 import com.github.tomakehurst.wiremock.junit.WireMockRule;
-import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.openTSDB.OpenTSDBStoragePluginConfig;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.QueryTestUtil;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -48,8 +49,9 @@ import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POS
 import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS;
 import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS;
 import static org.apache.drill.store.openTSDB.TestDataHolder.SAMPLE_DATA_FOR_POST_REQUEST_WITH_TAGS;
+import static org.junit.Assert.assertEquals;
 
-public class TestOpenTSDBPlugin extends PlanTestBase {
+public class TestOpenTSDBPlugin extends ClusterTest {
 
   private static int portNumber;
 
@@ -58,8 +60,9 @@ public class TestOpenTSDBPlugin extends PlanTestBase {
 
   @BeforeClass
   public static void setup() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
     portNumber = QueryTestUtil.getFreePortNumber(10_000, 200);
-    final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+    final StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
     OpenTSDBStoragePluginConfig storagePluginConfig =
         new OpenTSDBStoragePluginConfig(String.format("http://localhost:%s", portNumber));
     storagePluginConfig.setEnabled(true);
@@ -109,11 +112,11 @@ public class TestOpenTSDBPlugin extends PlanTestBase {
             .withBody(SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITHOUT_TAGS)));
 
     wireMockRule.stubFor(post(urlEqualTo("/api/query"))
-            .withRequestBody(equalToJson(END_PARAM_REQUEST_WTIHOUT_TAGS))
-            .willReturn(aResponse()
-                    .withStatus(200)
-                    .withHeader("Content-Type", "application/json")
-                    .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS)));
+        .withRequestBody(equalToJson(END_PARAM_REQUEST_WTIHOUT_TAGS))
+        .willReturn(aResponse()
+            .withStatus(200)
+            .withHeader("Content-Type", "application/json")
+            .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITHOUT_TAGS)));
 
     wireMockRule.stubFor(post(urlEqualTo("/api/query"))
         .withRequestBody(equalToJson(DOWNSAMPLE_REQUEST_WITH_TAGS))
@@ -123,11 +126,11 @@ public class TestOpenTSDBPlugin extends PlanTestBase {
             .withBody(SAMPLE_DATA_FOR_POST_DOWNSAMPLE_REQUEST_WITH_TAGS)));
 
     wireMockRule.stubFor(post(urlEqualTo("/api/query"))
-            .withRequestBody(equalToJson(END_PARAM_REQUEST_WITH_TAGS))
-            .willReturn(aResponse()
-                    .withStatus(200)
-                    .withHeader("Content-Type", "application/json")
-                    .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS)));
+        .withRequestBody(equalToJson(END_PARAM_REQUEST_WITH_TAGS))
+        .willReturn(aResponse()
+            .withStatus(200)
+            .withHeader("Content-Type", "application/json")
+            .withBody(SAMPLE_DATA_FOR_POST_END_REQUEST_WITH_TAGS)));
 
     wireMockRule.stubFor(post(urlEqualTo("/api/query"))
         .withRequestBody(equalToJson(REQUEST_TO_NONEXISTENT_METRIC))
@@ -141,48 +144,77 @@ public class TestOpenTSDBPlugin extends PlanTestBase {
   public void testBasicQueryFromWithRequiredParams() throws Exception {
     String query =
             "select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`";
-    Assert.assertEquals(18, testSql(query));
+    assertEquals(18, runQuery(query));
   }
 
   @Test
   public void testBasicQueryGroupBy() throws Exception {
     String query =
             "select `timestamp`, sum(`aggregated value`) from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago)` group by `timestamp`";
-    Assert.assertEquals(15, testSql(query));
+    assertEquals(15, runQuery(query));
   }
 
   @Test
   public void testBasicQueryFromWithInterpolationParam() throws Exception {
     String query = "select * from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago, downsample=5y-avg)`";
-    Assert.assertEquals(4, testSql(query));
+    assertEquals(4, runQuery(query));
   }
 
   @Test
   public void testBasicQueryFromWithEndParam() throws Exception {
     String query = "select * from openTSDB.`(metric=warp.speed.test, aggregator=sum, start=47y-ago, end=1407165403000))`";
-    Assert.assertEquals(5, testSql(query));
+    assertEquals(5, runQuery(query));
   }
 
   @Test(expected = UserRemoteException.class)
   public void testBasicQueryWithoutTableName() throws Exception {
-    test("select * from openTSDB.``;");
+    runQuery("select * from openTSDB.``;");
   }
 
   @Test(expected = UserRemoteException.class)
   public void testBasicQueryWithNonExistentTableName() throws Exception {
-    test("select * from openTSDB.`warp.spee`");
+    runQuery("select * from openTSDB.`warp.spee`");
   }
 
   @Test
   public void testPhysicalPlanSubmission() throws Exception {
     String query = "select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`";
-    testPhysicalPlanExecutionBasedOnQuery(query);
+    String plan = queryBuilder()
+        .sql(query)
+        .explainJson();
+    queryBuilder()
+        .query(QueryType.PHYSICAL, plan)
+        .run();
   }
 
   @Test
   public void testDescribe() throws Exception {
-    test("use openTSDB");
-    test("describe `warp.speed.test`");
-    Assert.assertEquals(1, testSql("show tables"));
+    runQuery("use openTSDB");
+    runQuery("describe `warp.speed.test`");
+    assertEquals(1, runQuery("show tables"));
+  }
+
+  @Test
+  public void testInformationSchemaWrongPluginConfig() throws Exception {
+    ClusterFixture cluster = ClusterFixture.bareBuilder(dirTestWatcher)
+        .build();
+    int portNumber = QueryTestUtil.getFreePortNumber(10_000, 200);
+    final StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
+    OpenTSDBStoragePluginConfig storagePluginConfig =
+        new OpenTSDBStoragePluginConfig(String.format("http://localhost:%s/", portNumber));
+    storagePluginConfig.setEnabled(true);
+    pluginRegistry.createOrUpdate(OpenTSDBStoragePluginConfig.NAME, storagePluginConfig, true);
+    String query = "select * from information_schema.`views`";
+    cluster.clientFixture()
+        .queryBuilder()
+        .sql(query)
+        .run();
+  }
+
+  private long runQuery(String query) throws Exception {
+    return queryBuilder()
+        .sql(query)
+        .run()
+        .recordCount();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 0fbfe4f..2bf6bc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -581,7 +581,11 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
 
         // finally register schemas with the refreshed plugins
         for (StoragePlugin plugin : enabledPlugins.plugins()) {
-          plugin.registerSchemas(schemaConfig, parent);
+          try {
+            plugin.registerSchemas(schemaConfig, parent);
+          } catch (Exception e) {
+            logger.warn("Error during `{}` schema initialization: {}", plugin.getName(), e.getMessage(), e.getCause());
+          }
         }
       } catch (ExecutionSetupException e) {
         throw new DrillRuntimeException("Failure while updating storage plugins", e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
index a85a7a7..2197f1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -378,8 +378,9 @@ public abstract class InfoSchemaRecordGenerator<S> {
     @Override
     public boolean visitTable(String schemaName, String tableName, Table table) {
       if (table.getJdbcTableType() == TableType.VIEW) {
+        // View's SQL may not be available for some non-Drill views, for example, JDBC view
         records.add(new Records.View(IS_CATALOG_NAME, schemaName, tableName,
-                    ((DrillViewInfoProvider) table).getViewSql()));
+            table instanceof DrillViewInfoProvider ? ((DrillViewInfoProvider) table).getViewSql() : ""));
       }
       return false;
     }