You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2021/07/27 13:21:13 UTC

[drill] branch master updated: DRILL-7975: Connection to Splunk Drill Storage Plugin fails intermittently (#2278)

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 958d849  DRILL-7975: Connection to Splunk Drill Storage Plugin fails intermittently (#2278)
958d849 is described below

commit 958d849144a662a781e4d7d59adbf3300ad3bdea
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Tue Jul 27 16:21:04 2021 +0300

    DRILL-7975: Connection to Splunk Drill Storage Plugin fails intermittently (#2278)
    
    * DRILL-7975: Connection to Splunk Drill Storage Plugin fails intermittently
    
    * Changes according to review
    
    * Removing reconnectRetries from all plugins. Removing deserializing splunk json config in tests.
    
    * Revert CI Direct Memory property: 3200Mb -> 2500Mb
---
 .../exec/store/druid/DruidStoragePluginConfig.java |  1 -
 .../apache/drill/store/kudu/TestKuduConnect.java   |  2 +-
 contrib/storage-splunk/README.md                   | 11 +++++----
 .../drill/exec/store/splunk/SplunkConnection.java  | 14 ++++++++++-
 .../exec/store/splunk/SplunkPluginConfig.java      | 10 +++++++-
 .../main/resources/bootstrap-storage-plugins.json  |  3 ++-
 .../exec/store/splunk/SplunkConnectionTest.java    |  3 ++-
 .../drill/exec/store/splunk/SplunkPluginTest.java  | 27 ++++++++++++++++++++++
 .../drill/exec/store/splunk/SplunkTestSuite.java   |  3 ++-
 .../planner/sql/handlers/ShowTablesHandler.java    |  2 +-
 .../drill/exec/store/ischema/InfoSchemaConfig.java |  1 -
 .../AbstractSecuredStoragePluginConfig.java        |  9 ++++----
 pom.xml                                            | 10 ++++++--
 13 files changed, 76 insertions(+), 20 deletions(-)

diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
index 6bf13a1..3835501 100644
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
@@ -42,7 +42,6 @@ public class DruidStoragePluginConfig extends StoragePluginConfig {
     @JsonProperty("brokerAddress") String brokerAddress,
     @JsonProperty("coordinatorAddress") String coordinatorAddress,
     @JsonProperty("averageRowSizeBytes") Integer averageRowSizeBytes) {
-
     this.brokerAddress = brokerAddress;
     this.coordinatorAddress = coordinatorAddress;
     this.averageRowSizeBytes =
diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java
index f75e983..3f97d3d 100644
--- a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java
+++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java
@@ -40,7 +40,7 @@ import org.apache.kudu.client.RowResultIterator;
 import org.apache.kudu.client.SessionConfiguration;
 import org.junit.experimental.categories.Category;
 
-@Ignore("requires remote kudu server")
+@Ignore("requires remote kudu server") // TODO: can be rewritten by leveraging kudu docker container: DRILL-7977
 @Category(KuduStorageTest.class)
 public class TestKuduConnect extends BaseTest {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestKuduConnect.class);
diff --git a/contrib/storage-splunk/README.md b/contrib/storage-splunk/README.md
index a8fd0e7..f28c4b8 100644
--- a/contrib/storage-splunk/README.md
+++ b/contrib/storage-splunk/README.md
@@ -3,21 +3,24 @@ This plugin enables Drill to query Splunk.
 
 ## Configuration
 To connect Drill to Splunk, create a new storage plugin with the following configuration:
-
-To successfully connect, Splunk uses port `8089` for interfaces.  This port must be open for Drill to query Splunk. 
-
 ```json
 {
    "type":"splunk",
+   "enabled": false,
    "username": "admin",
    "password": "changeme",
    "hostname": "localhost",
    "port": 8089,
    "earliestTime": "-14d",
    "latestTime": "now",
-   "enabled": false
+   "reconnectRetries": 3   
 }
 ```
+To successfully connect, Splunk uses port `8089` for interfaces.  This port must be open for Drill to query Splunk.
+
+Sometimes Splunk has issue in connection to it:
+https://github.com/splunk/splunk-sdk-java/issues/62 <br>
+To bypass it by Drill please specify "reconnectRetries": 3. It allows you to retry the connection several times. 
 
 ## Understanding Splunk's Data Model
 Splunk's primary use case is analyzing event logs with a timestamp. As such, data is indexed by the timestamp, with the most recent data being indexed first.  By default, Splunk
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
index 05988c4..b262d7d 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
@@ -30,6 +30,8 @@ import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * This class wraps the functionality of the Splunk connection for Drill.
  */
@@ -41,11 +43,13 @@ public class SplunkConnection {
   private final String hostname;
   private final int port;
   private Service service;
+  private int connectionAttempts;
 
   public SplunkConnection(SplunkPluginConfig config) {
     this.credentials = config.getUsernamePasswordCredentials();
     this.hostname = config.getHostname();
     this.port = config.getPort();
+    this.connectionAttempts = config.getReconnectRetries();
     service = connect();
     ConfCollection confs = service.getConfs();
   }
@@ -71,10 +75,18 @@ public class SplunkConnection {
     loginArgs.setPort(port);
     loginArgs.setPassword(credentials.getPassword());
     loginArgs.setUsername(credentials.getUsername());
-
     try {
+      connectionAttempts--;
       service = Service.connect(loginArgs);
     } catch (Exception e) {
+      if(connectionAttempts > 0) {
+        try {
+          TimeUnit.SECONDS.sleep(2);
+        } catch (InterruptedException interruptedException) {
+          logger.error("Unable to wait 2 secs before next connection trey to Splunk");
+        }
+        return connect();
+      }
       throw UserException
         .connectionError()
         .message("Unable to connect to Splunk at %s:%s", hostname, port)
diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
index c122a98..54c6564 100644
--- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
+++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
@@ -34,12 +34,14 @@ import java.util.Objects;
 public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig {
 
   public static final String NAME = "splunk";
+  public static final int DISABLED_RECONNECT_RETRIES = 1;
 
   private final String hostname;
   private final String earliestTime;
   private final String latestTime;
 
   private final int port;
+  private final Integer reconnectRetries;
 
   @JsonCreator
   public SplunkPluginConfig(@JsonProperty("username") String username,
@@ -48,13 +50,15 @@ public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig {
                             @JsonProperty("port") int port,
                             @JsonProperty("earliestTime") String earliestTime,
                             @JsonProperty("latestTime") String latestTime,
-                            @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) {
+                            @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
+                            @JsonProperty("reconnectRetries") Integer reconnectRetries) {
     super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider),
         credentialsProvider == null);
     this.hostname = hostname;
     this.port = port;
     this.earliestTime = earliestTime;
     this.latestTime = latestTime == null ? "now" : latestTime;
+    this.reconnectRetries = reconnectRetries;
   }
 
   @JsonIgnore
@@ -98,6 +102,10 @@ public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig {
     return latestTime;
   }
 
+  @JsonProperty("reconnectRetries")
+  public int getReconnectRetries() {
+    return reconnectRetries != null ? reconnectRetries : DISABLED_RECONNECT_RETRIES;
+  }
 
   @Override
   public boolean equals(Object that) {
diff --git a/contrib/storage-splunk/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-splunk/src/main/resources/bootstrap-storage-plugins.json
index 8a55547..db66657 100644
--- a/contrib/storage-splunk/src/main/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-splunk/src/main/resources/bootstrap-storage-plugins.json
@@ -2,13 +2,14 @@
   "storage":{
     "splunk" : {
       "type":"splunk",
+      "enabled": false,
       "username": "admin",
       "password": "changeme",
       "hostname": "localhost",
       "port": 8089,
       "earliestTime": "-14d",
       "latestTime": "now",
-      "enabled": false
+      "reconnectRetries": 2
     }
   }
 }
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
index 67006d5..d497c6e 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
@@ -49,7 +49,8 @@ public class SplunkConnectionTest extends SplunkBaseTest {
               SPLUNK_STORAGE_PLUGIN_CONFIG.getPort(),
               SPLUNK_STORAGE_PLUGIN_CONFIG.getEarliestTime(),
               SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(),
-              null
+              null,
+              SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries()
       );
       SplunkConnection sc = new SplunkConnection(invalidSplunkConfig);
       sc.connect();
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
index 254a96c..a9e7aff 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.splunk;
 
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos;
@@ -30,10 +32,14 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runners.MethodSorters;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
+import static org.apache.drill.exec.store.splunk.SplunkTestSuite.SPLUNK_STORAGE_PLUGIN_CONFIG;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.times;
 
 @FixMethodOrder(MethodSorters.JVM)
 @Category({SlowTest.class})
@@ -290,4 +296,25 @@ public class SplunkPluginTest extends SplunkBaseTest {
     int cnt = queryBuilder().physical(plan).singletonInt();
     assertEquals("Counts should match", 1, cnt);
   }
+
+  @Test
+  public void testReconnectRetries() {
+    try (MockedStatic<Service> splunk = Mockito.mockStatic(Service.class)) {
+      ServiceArgs loginArgs = new ServiceArgs();
+      loginArgs.setHost(SPLUNK_STORAGE_PLUGIN_CONFIG.getHostname());
+      loginArgs.setPort(SPLUNK_STORAGE_PLUGIN_CONFIG.getPort());
+      loginArgs.setPassword(SPLUNK_STORAGE_PLUGIN_CONFIG.getPassword());
+      loginArgs.setUsername(SPLUNK_STORAGE_PLUGIN_CONFIG.getUsername());
+      splunk.when(() -> Service.connect(loginArgs))
+          .thenThrow(new RuntimeException("Fail first connection to Splunk"))
+          .thenThrow(new RuntimeException("Fail second connection to Splunk"))
+          .thenThrow(new RuntimeException("Fail third connection to Splunk"))
+          .thenReturn(new Service(loginArgs)); // fourth connection is successful
+      new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG); // it will fail, in case "reconnectRetries": 1 is specified in configs
+      splunk.verify(
+          () -> Service.connect(loginArgs),
+          times(4)
+      );
+    }
+  }
 }
diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
index 8e24587..23082e3 100644
--- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
+++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
@@ -71,7 +71,8 @@ public class SplunkTestSuite extends ClusterTest {
         String hostname = splunk.getHost();
         Integer port = splunk.getFirstMappedPort();
         StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
-        SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now", null);
+        SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now",
+                null, 4);
         SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true);
         pluginRegistry.put(SplunkPluginConfig.NAME, SPLUNK_STORAGE_PLUGIN_CONFIG);
         runningSuite = true;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
index c726d30..42cb7e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
@@ -67,7 +67,7 @@ public class ShowTablesHandler extends DefaultSqlHandler {
 
       if (schemaPlus == null) {
         throw UserException.validationError()
-            .message(String.format("Invalid schema name [%s]", SchemaUtilites.getSchemaPath(schemaNames)))
+            .message("Invalid schema name [%s]", SchemaUtilites.getSchemaPath(schemaNames))
             .build(logger);
       }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
index 035787e..59bfd18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
@@ -22,7 +22,6 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 public class InfoSchemaConfig extends StoragePluginConfig {
 
   public static final String NAME = "ischema";
-
   public static final InfoSchemaConfig INSTANCE = new InfoSchemaConfig();
 
   @Override
diff --git a/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java
index 441ce73..ce8d8dc 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java
@@ -25,16 +25,15 @@ public abstract class AbstractSecuredStoragePluginConfig extends StoragePluginCo
   protected final CredentialsProvider credentialsProvider;
   protected boolean directCredentials;
 
+  public AbstractSecuredStoragePluginConfig() {
+    this(PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER,  true);
+  }
+
   public AbstractSecuredStoragePluginConfig(CredentialsProvider credentialsProvider, boolean directCredentials) {
     this.credentialsProvider = credentialsProvider;
     this.directCredentials = directCredentials;
   }
 
-  public AbstractSecuredStoragePluginConfig() {
-    this.credentialsProvider = PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
-    this.directCredentials = true;
-  }
-
   public CredentialsProvider getCredentialsProvider() {
     if (directCredentials) {
       return null;
diff --git a/pom.xml b/pom.xml
index f7cf7f3..f5767f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,7 @@
     <wiremock.standalone.version>2.23.2</wiremock.standalone.version>
     <jmockit.version>1.47</jmockit.version>
     <logback.version>1.2.3</logback.version>
-    <mockito.version>3.11.0</mockito.version>
+    <mockito.version>3.11.2</mockito.version>
     <!--
       Currently Hive storage plugin only supports Apache Hive 3.1.2 or vendor specific variants of the
       Apache Hive 2.3.2. If the version is changed, make sure the jars and their dependencies are updated,
@@ -1117,7 +1117,13 @@
            long as Mockito _contains_ older Hamcrest classes.  See DRILL-2130. -->
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
-      <version>2.23.4</version>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-inline</artifactId>
+      <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>