You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by al...@apache.org on 2020/05/01 11:28:34 UTC
[zeppelin] branch master updated: [ZEPPELIN-4378] porting Cassandra
interpreter to new driver
This is an automated email from the ASF dual-hosted git repository.
alexott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 5f6a5d5 [ZEPPELIN-4378] porting Cassandra interpreter to new driver
5f6a5d5 is described below
commit 5f6a5d58fd87812281349ea01cb8b79c75c3ef40
Author: Alex Ott <al...@gmail.com>
AuthorDate: Wed Mar 25 18:12:23 2020 +0100
[ZEPPELIN-4378] porting Cassandra interpreter to new driver
### What is this PR for?
This PR refactors Cassandra interpreter to use new DataStax Java driver (4.x) that has new features & new architecture, that caused significant changes in the code base. But as result, we're getting:
* We get supported version of the driver (3.x line will only receive critical bug fixes)
* We get support for additional functionality of the DSE (DataStax Enterprise) - security, additional data types, etc.
* Improvements in the load balancing, etc.
### What type of PR is it?
Refactoring
### What is the Jira issue?
* ZEPPELIN-4378
### How should this be tested?
* Travis CI build: https://travis-ci.org/github/alexott/zeppelin/builds/681878444
* Was tested manually in addition to the unit tests
### Questions:
* Does the licenses files need update? Done, minor updates
* Is there breaking changes for older versions? There are renames & deletion of the configuration properties, but this doesn't break existing setups. The only breaking change is removal of "standard" retry policies from syntax - but it was necessary to do, as they not shipped with driver anymore.
* Does this needs documentation? Documentation was updated
Author: Alex Ott <al...@gmail.com>
Closes #3699 from alexott/ZEPPELIN-4378 and squashes the following commits:
1c9145379 [Alex Ott] [ZEPPELIN-4378] porting Cassandra interpreter to new driver
---
cassandra/pom.xml | 62 +-
.../zeppelin/cassandra/CassandraInterpreter.java | 174 +++--
.../src/main/resources/interpreter-setting.json | 68 +-
.../src/main/resources/scalate/allAggregates.ssp | 2 +-
.../src/main/resources/scalate/allFunctions.ssp | 2 +-
.../resources/scalate/allMaterializedViews.ssp | 2 +-
cassandra/src/main/resources/scalate/allTables.ssp | 2 +-
cassandra/src/main/resources/scalate/allUDTs.ssp | 2 +-
cassandra/src/main/resources/scalate/helpMenu.ssp | 36 +-
.../resources/scalate/materializedViewDetails.ssp | 10 +-
.../scalate/noResultWithExecutionInfo.ssp | 7 +-
.../src/main/resources/scalate/tableDetails.ssp | 10 +-
.../src/main/resources/scalate/udtDetails.ssp | 2 +-
.../driver/core/TableMetadataWrapper.scala | 4 +-
.../zeppelin/cassandra/BoundValuesParser.scala | 9 +-
.../apache/zeppelin/cassandra/DisplaySystem.scala | 491 +++++++-------
.../zeppelin/cassandra/EnhancedSession.scala | 215 +++---
.../zeppelin/cassandra/InterpreterLogic.scala | 279 ++++----
.../zeppelin/cassandra/JavaDriverConfig.scala | 397 +++--------
.../zeppelin/cassandra/MetaDataHierarchy.scala | 52 +-
.../zeppelin/cassandra/ParagraphParser.scala | 247 +++----
.../zeppelin/cassandra/TextBlockHierarchy.scala | 14 +-
.../scala/compat/java8/OptionConverters.scala | 140 ++++
.../cassandra/CassandraInterpreterTest.java | 255 ++++---
.../zeppelin/cassandra/InterpreterLogicTest.java | 103 ++-
cassandra/src/test/resources/application.conf | 3 +
.../{prepare_schema.cql => prepare_all.cql} | 21 +-
cassandra/src/test/resources/prepare_data.cql | 18 -
.../scalate/DescribeKeyspace_live_data.html | 2 +-
.../test/resources/scalate/DescribeKeyspaces.html | 2 +-
.../DescribeTable_live_data_complex_table.html | 2 +-
.../src/test/resources/scalate/DescribeTables.html | 750 +++++++++++++--------
.../src/test/resources/scalate/DescribeTypes.html | 179 +++++
cassandra/src/test/resources/scalate/Help.html | 2 +-
.../scalate/NoResultWithExecutionInfo.html | 43 +-
.../zeppelin/cassandra/EnhancedSessionTest.scala | 18 +-
.../zeppelin/cassandra/ParagraphParserTest.scala | 45 +-
docs/interpreter/cassandra.md | 108 +--
zeppelin-distribution/src/bin_license/LICENSE | 6 +-
39 files changed, 1962 insertions(+), 1822 deletions(-)
diff --git a/cassandra/pom.xml b/cassandra/pom.xml
index 6424f6c..6d37359 100644
--- a/cassandra/pom.xml
+++ b/cassandra/pom.xml
@@ -33,15 +33,15 @@
<description>Zeppelin cassandra support</description>
<properties>
- <cassandra.driver.version>3.7.2</cassandra.driver.version>
- <snappy.version>1.1.2.6</snappy.version>
- <lz4.version>1.4.1</lz4.version>
+ <cassandra.driver.version>4.6.0</cassandra.driver.version>
+ <snappy.version>1.1.7.3</snappy.version>
+ <lz4.version>1.6.0</lz4.version>
+ <commons-lang.version>3.3.2</commons-lang.version>
<scalate.version>1.7.1</scalate.version>
- <cassandra.guava.version>19.0</cassandra.guava.version>
<!-- test library versions -->
- <achilles.version>3.2.4-Zeppelin</achilles.version>
<jna.version>4.2.0</jna.version>
+ <cassandra.unit.version>4.3.1.0</cassandra.unit.version>
<interpreter.name>cassandra</interpreter.name>
</properties>
@@ -52,7 +52,7 @@
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.hamcrest</groupId>
@@ -62,17 +62,11 @@
</dependency>
<dependency>
- <groupId>com.datastax.cassandra</groupId>
- <artifactId>cassandra-driver-core</artifactId>
+ <groupId>com.datastax.oss</groupId>
+ <artifactId>java-driver-core</artifactId>
<version>${cassandra.driver.version}</version>
</dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${cassandra.guava.version}</version>
- </dependency>
-
<!-- Compression libraries for the cassandra-driver protocol. -->
<!-- Include both compression options to make to simplify deployment. -->
@@ -109,6 +103,13 @@
<scope>runtime</scope>
</dependency>
+ <!-- we'll switch back to it after removing support for Scala 2.10
+ <dependency>
+ <groupId>org.scala-lang.modules</groupId>
+ <artifactId>scala-java8-compat_${scala.binary.version}</artifactId>
+ <version>${scala.java8.compat.version}</version>
+ </dependency> -->
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@@ -156,30 +157,19 @@
</dependency>
<dependency>
- <groupId>info.archinnov</groupId>
- <artifactId>achilles-embedded</artifactId>
- <version>${achilles.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.java.dev.jna</groupId>
- <artifactId>jna</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>org.cassandraunit</groupId>
+ <artifactId>cassandra-unit</artifactId>
+ <version>${cassandra.unit.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.datastax.oss</groupId>
+ <artifactId>java-driver-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
+
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
index 105e271..682bf2d 100644
--- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
+++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
@@ -16,33 +16,36 @@
*/
package org.apache.zeppelin.cassandra;
-import static java.lang.Integer.parseInt;
-
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
+import com.datastax.oss.driver.internal.core.loadbalancing.DcInferringLoadBalancingPolicy;
+import com.datastax.oss.driver.shaded.guava.common.net.InetAddresses;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
import java.io.InputStream;
+import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Properties;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.JdkSSLOptions;
-import com.datastax.driver.core.ProtocolOptions.Compression;
-import com.datastax.driver.core.Session;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
+import static java.lang.Integer.parseInt;
/**
* Interpreter for Apache Cassandra CQL query language.
@@ -68,24 +71,12 @@ public class CassandraInterpreter extends Interpreter {
"cassandra.speculative.execution.policy";
public static final String CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS =
"cassandra.max.schema.agreement.wait.second";
- public static final String CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL =
- "cassandra.pooling.new.connection.threshold.local";
- public static final String CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE =
- "cassandra.pooling.new.connection.threshold.remote";
- public static final String CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL =
- "cassandra.pooling.max.connection.per.host.local";
- public static final String CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE =
- "cassandra.pooling.max.connection.per.host.remote";
- public static final String CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL =
- "cassandra.pooling.core.connection.per.host.local";
- public static final String CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE =
- "cassandra.pooling.core.connection.per.host.remote";
- public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL =
- "cassandra.pooling.max.request.per.connection.local";
- public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE =
- "cassandra.pooling.max.request.per.connection.remote";
- public static final String CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS =
- "cassandra.pooling.idle.timeout.seconds";
+ public static final String CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL =
+ "cassandra.pooling.connection.per.host.local";
+ public static final String CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE =
+ "cassandra.pooling.connection.per.host.remote";
+ public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION =
+ "cassandra.pooling.max.request.per.connection";
public static final String CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS =
"cassandra.pooling.pool.timeout.millisecs";
public static final String CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS =
@@ -122,24 +113,15 @@ public class CassandraInterpreter extends Interpreter {
"cassandra.ssl.truststore.password";
- public static final String DEFAULT_HOST = "localhost";
+ public static final String DEFAULT_HOST = "127.0.0.1";
public static final String DEFAULT_PORT = "9042";
- public static final String DEFAULT_CLUSTER = "Test Cluster";
public static final String DEFAULT_KEYSPACE = "system";
- public static final String DEFAULT_PROTOCOL_VERSION = "4";
- public static final String DEFAULT_COMPRESSION = "NONE";
- public static final String DEFAULT_CREDENTIAL = "none";
+ public static final String DEFAULT_PROTOCOL_VERSION = "DEFAULT";
+ public static final String DEFAULT_COMPRESSION = "none";
+ public static final String DEFAULT_CONNECTIONS_PER_HOST = "1";
+ public static final String DEFAULT_MAX_REQUEST_PER_CONNECTION = "1024";
public static final String DEFAULT_POLICY = "DEFAULT";
public static final String DEFAULT_PARALLELISM = "10";
- static String defaultNewConnectionThresholdLocal = "100";
- static String defaultNewConnectionThresholdRemote = "100";
- static String defaultCoreConnectionPerHostLocal = "2";
- static String defaultCoreConnectionPerHostRemote = "1";
- static String defaultMaxConnectionPerHostLocal = "8";
- static String defaultMaxConnectionPerHostRemote = "2";
- static String defaultMaxRequestPerConnectionLocal = "1024";
- static String defaultMaxRequestPerConnectionRemote = "256";
- public static final String DEFAULT_IDLE_TIMEOUT = "120";
public static final String DEFAULT_POOL_TIMEOUT = "5000";
public static final String DEFAULT_HEARTBEAT_INTERVAL = "30";
public static final String DEFAULT_CONSISTENCY = "ONE";
@@ -148,19 +130,12 @@ public class CassandraInterpreter extends Interpreter {
public static final String DEFAULT_CONNECTION_TIMEOUT = "5000";
public static final String DEFAULT_READ_TIMEOUT = "12000";
public static final String DEFAULT_TCP_NO_DELAY = "true";
+ public static final String DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS = "12";
- public static final String DOWNGRADING_CONSISTENCY_RETRY = "DOWNGRADING_CONSISTENCY";
- public static final String FALLTHROUGH_RETRY = "FALLTHROUGH";
- public static final String LOGGING_DEFAULT_RETRY = "LOGGING_DEFAULT";
- public static final String LOGGING_DOWNGRADING_RETRY = "LOGGING_DOWNGRADING";
- public static final String LOGGING_FALLTHROUGH_RETRY = "LOGGING_FALLTHROUGH";
-
- public static final List NO_COMPLETION = new ArrayList<>();
+ static final List NO_COMPLETION = new ArrayList<>();
InterpreterLogic helper;
- Cluster.Builder clusterBuilder;
- Cluster cluster;
- Session session;
+ CqlSession session;
private JavaDriverConfig driverConfig = new JavaDriverConfig();
public CassandraInterpreter(Properties properties) {
@@ -170,35 +145,58 @@ public class CassandraInterpreter extends Interpreter {
@Override
public void open() {
- final String[] addresses = getProperty(CASSANDRA_HOSTS).split(",");
- final int port = parseInt(getProperty(CASSANDRA_PORT));
- StringBuilder hosts = new StringBuilder();
+ final String[] addresses = getProperty(CASSANDRA_HOSTS, DEFAULT_HOST).split(",");
+ final int port = parseInt(getProperty(CASSANDRA_PORT, DEFAULT_PORT));
+ Collection<InetSocketAddress> hosts = new ArrayList<>();
for (String address : addresses) {
- hosts.append(address).append(",");
+ if (InetAddresses.isInetAddress(address)) {
+ hosts.add(new InetSocketAddress(address, port));
+ } else {
+ // TODO(alex): maybe it won't be necessary in 4.4
+ hosts.add(InetSocketAddress.createUnresolved(address, port));
+ }
}
- LOGGER.info("Bootstrapping Cassandra Java Driver to connect to " + hosts.toString() +
- "on port " + port);
-
- Compression compression = driverConfig.getCompressionProtocol(this);
-
- clusterBuilder = Cluster.builder()
- .addContactPoints(addresses)
- .withPort(port)
- .withProtocolVersion(driverConfig.getProtocolVersion(this))
- .withClusterName(getProperty(CASSANDRA_CLUSTER_NAME))
- .withCompression(compression)
- .withCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME),
+ LOGGER.info("Bootstrapping Cassandra Java Driver to connect to " +
+ getProperty(CASSANDRA_HOSTS) + "on port " + port);
+
+ // start generation of the config
+ ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader.programmaticBuilder();
+
+ driverConfig.setCompressionProtocol(this, configBuilder);
+ driverConfig.setPoolingOptions(this, configBuilder);
+ driverConfig.setProtocolVersion(this, configBuilder);
+ driverConfig.setQueryOptions(this, configBuilder);
+ driverConfig.setReconnectionPolicy(this, configBuilder);
+ driverConfig.setRetryPolicy(this, configBuilder);
+ driverConfig.setSocketOptions(this, configBuilder);
+ driverConfig.setSpeculativeExecutionPolicy(this, configBuilder);
+
+ //
+ configBuilder.withClass(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS,
+ DcInferringLoadBalancingPolicy.class);
+ configBuilder.withBoolean(DefaultDriverOption.RESOLVE_CONTACT_POINTS, false);
+
+ configBuilder.withInt(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT,
+ parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
+ DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)));
+
+ DriverConfigLoader loader = configBuilder.endProfile().build();
+ // TODO(alex): think how to dump built configuration...
+ logger.debug(loader.toString());
+ // end generation of config
+
+ CqlSessionBuilder clusterBuilder = CqlSession.builder()
+ .addContactPoints(hosts)
+ .withAuthCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME),
getProperty(CASSANDRA_CREDENTIALS_PASSWORD))
- .withLoadBalancingPolicy(driverConfig.getLoadBalancingPolicy(this))
- .withRetryPolicy(driverConfig.getRetryPolicy(this))
- .withReconnectionPolicy(driverConfig.getReconnectionPolicy(this))
- .withSpeculativeExecutionPolicy(driverConfig.getSpeculativeExecutionPolicy(this))
- .withMaxSchemaAgreementWaitSeconds(
- parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)))
- .withPoolingOptions(driverConfig.getPoolingOptions(this))
- .withQueryOptions(driverConfig.getQueryOptions(this))
- .withSocketOptions(driverConfig.getSocketOptions(this));
+ .withApplicationName("")
+ .withApplicationVersion("");
+
+ String keyspace = getProperty(CASSANDRA_KEYSPACE_NAME, DEFAULT_KEYSPACE);
+ if (StringUtils.isNotBlank(keyspace) && !DEFAULT_KEYSPACE.equalsIgnoreCase(keyspace)) {
+ clusterBuilder.withKeyspace(keyspace);
+ }
final String runWithSSL = getProperty(CASSANDRA_WITH_SSL);
if (runWithSSL != null && runWithSSL.equals("true")) {
@@ -219,9 +217,7 @@ public class CassandraInterpreter extends Interpreter {
sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
}
- clusterBuilder = clusterBuilder.withSSL(JdkSSLOptions.builder()
- .withSSLContext(sslContext)
- .build());
+ clusterBuilder = clusterBuilder.withSslContext(sslContext);
} catch (Exception e) {
LOGGER.error(e.toString());
}
@@ -229,15 +225,13 @@ public class CassandraInterpreter extends Interpreter {
LOGGER.debug("Cassandra Interpreter: Not using SSL");
}
- cluster = clusterBuilder.build();
- session = cluster.connect();
+ session = clusterBuilder.withConfigLoader(loader).build();
helper = new InterpreterLogic(session);
}
@Override
public void close() {
session.close();
- cluster.close();
}
@Override
@@ -269,6 +263,6 @@ public class CassandraInterpreter extends Interpreter {
public Scheduler getScheduler() {
return SchedulerFactory.singleton()
.createOrGetParallelScheduler(CassandraInterpreter.class.getName() + this.hashCode(),
- parseInt(getProperty(CASSANDRA_INTERPRETER_PARALLELISM)));
+ parseInt(getProperty(CASSANDRA_INTERPRETER_PARALLELISM, DEFAULT_PARALLELISM)));
}
}
diff --git a/cassandra/src/main/resources/interpreter-setting.json b/cassandra/src/main/resources/interpreter-setting.json
index 0f0d58c..f8d2951 100644
--- a/cassandra/src/main/resources/interpreter-setting.json
+++ b/cassandra/src/main/resources/interpreter-setting.json
@@ -64,28 +64,28 @@
"envName": null,
"propertyName": "cassandra.load.balancing.policy",
"defaultValue": "DEFAULT",
- "description": "Cassandra Load Balancing Policy. Default = new TokenAwarePolicy(new DCAwareRoundRobinPolicy())",
+ "description": "Class name for Load Balancing Policy. Default = DefaultLoadBalancingPolicy",
"type": "string"
},
"cassandra.retry.policy": {
"envName": null,
"propertyName": "cassandra.retry.policy",
"defaultValue": "DEFAULT",
- "description": "Cassandra Retry Policy. Default = DefaultRetryPolicy.INSTANCE",
+ "description": "Class name for Retry Policy. Default = DefaultRetryPolicy",
"type": "string"
},
"cassandra.reconnection.policy": {
"envName": null,
"propertyName": "cassandra.reconnection.policy",
"defaultValue": "DEFAULT",
- "description": "Cassandra Reconnection Policy. Default = new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000)",
+ "description": "Class name for Reconnection Policy. Default = ExponentialReconnectionPolicy",
"type": "string"
},
"cassandra.speculative.execution.policy": {
"envName": null,
"propertyName": "cassandra.speculative.execution.policy",
"defaultValue": "DEFAULT",
- "description": "Cassandra Speculative Execution Policy. Default = NoSpeculativeExecutionPolicy.INSTANCE",
+ "description": "Class name for Speculative Execution Policy. Default = NoSpeculativeExecutionPolicy",
"type": "string"
},
"cassandra.interpreter.parallelism": {
@@ -102,67 +102,25 @@
"description": "Cassandra max schema agreement wait in second.Default = ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS",
"type": "number"
},
- "cassandra.pooling.new.connection.threshold.local": {
+ "cassandra.pooling.connection.per.host.local": {
"envName": null,
- "propertyName": "cassandra.pooling.new.connection.threshold.local",
- "defaultValue": "100",
- "description": "Cassandra new connection threshold local. Protocol V2 and below default = 100 Protocol V3 and above default = 800",
- "type": "number"
- },
- "cassandra.pooling.new.connection.threshold.remote": {
- "envName": null,
- "propertyName": "cassandra.pooling.new.connection.threshold.remote",
- "defaultValue": "100",
- "description": "Cassandra new connection threshold remove. Protocol V2 and below default = 100 Protocol V3 and above default = 200",
- "type": "number"
- },
- "cassandra.pooling.core.connection.per.host.local": {
- "envName": null,
- "propertyName": "cassandra.pooling.core.connection.per.host.local",
- "defaultValue": "2",
- "description": "Cassandra core connection per host local. Protocol V2 and below default = 2 Protocol V3 and above default = 1",
- "type": "number"
- },
- "cassandra.pooling.core.connection.per.host.remote": {
- "envName": null,
- "propertyName": "cassandra.pooling.core.connection.per.host.remote",
- "defaultValue": "1",
- "description": "Cassandra core connection per host remove. Protocol V2 and below default = 1 Protocol V3 and above default = 1",
- "type": "number"
- },
- "cassandra.pooling.max.connection.per.host.local": {
- "envName": null,
- "propertyName": "cassandra.pooling.max.connection.per.host.local",
+ "propertyName": "cassandra.pooling.connection.per.host.local",
"defaultValue": "8",
- "description": "Cassandra max connection per host local. Protocol V2 and below default = 8 Protocol V3 and above default = 1",
+ "description": "Cassandra connections per host local. Protocol V2 and below default = 8 Protocol V3 and above default = 1",
"type": "number"
},
- "cassandra.pooling.max.connection.per.host.remote": {
+ "cassandra.pooling.connection.per.host.remote": {
"envName": null,
- "propertyName": "cassandra.pooling.max.connection.per.host.remote",
+ "propertyName": "cassandra.pooling.connection.per.host.remote",
"defaultValue": "2",
- "description": "Cassandra max connection per host remote. Protocol V2 and below default = 2 Protocol V3 and above default = 1",
+ "description": "Cassandra connections per host remote. Protocol V2 and below default = 2 Protocol V3 and above default = 1",
"type": "number"
},
- "cassandra.pooling.max.request.per.connection.local": {
+ "cassandra.pooling.max.request.per.connection": {
"envName": null,
- "propertyName": "cassandra.pooling.max.request.per.connection.local",
+ "propertyName": "cassandra.pooling.max.request.per.connection",
"defaultValue": "1024",
- "description": "Cassandra max request per connection local. Protocol V2 and below default = 128 Protocol V3 and above default = 1024",
- "type": "number"
- },
- "cassandra.pooling.max.request.per.connection.remote": {
- "envName": null,
- "propertyName": "cassandra.pooling.max.request.per.connection.remote",
- "defaultValue": "256",
- "description": "Cassandra max request per connection remote. Protocol V2 and below default = 128 Protocol V3 and above default = 256",
- "type": "number"
- },
- "cassandra.pooling.idle.timeout.seconds": {
- "envName": null,
- "propertyName": "cassandra.pooling.idle.timeout.seconds",
- "defaultValue": "120",
- "description": "Cassandra idle time out in seconds. Default = 120",
+ "description": "Cassandra max requests per connection. Protocol V2 and below default = 128 Protocol V3 and above default = 1024",
"type": "number"
},
"cassandra.pooling.pool.timeout.millisecs": {
diff --git a/cassandra/src/main/resources/scalate/allAggregates.ssp b/cassandra/src/main/resources/scalate/allAggregates.ssp
index f093c50..e806d73 100644
--- a/cassandra/src/main/resources/scalate/allAggregates.ssp
+++ b/cassandra/src/main/resources/scalate/allAggregates.ssp
@@ -19,7 +19,7 @@
#import(org.apache.zeppelin.cassandra.MetaDataHierarchy._)
#import(java.util.UUID)
-<%@ val allAggregates: Map[(UUID, String), List[AggregateSummary]] %>
+<%@ val allAggregates: Map[(UUID, String), Seq[AggregateSummary]] %>
<div class="container">
diff --git a/cassandra/src/main/resources/scalate/allFunctions.ssp b/cassandra/src/main/resources/scalate/allFunctions.ssp
index fc756a4..e026687 100644
--- a/cassandra/src/main/resources/scalate/allFunctions.ssp
+++ b/cassandra/src/main/resources/scalate/allFunctions.ssp
@@ -19,7 +19,7 @@
#import(org.apache.zeppelin.cassandra.MetaDataHierarchy._)
#import(java.util.UUID)
-<%@ val allFunctions: Map[(UUID, String), List[FunctionSummary]] %>
+<%@ val allFunctions: Map[(UUID, String), Seq[FunctionSummary]] %>
<div class="container">
diff --git a/cassandra/src/main/resources/scalate/allMaterializedViews.ssp b/cassandra/src/main/resources/scalate/allMaterializedViews.ssp
index 8ffee57..58efdb4 100644
--- a/cassandra/src/main/resources/scalate/allMaterializedViews.ssp
+++ b/cassandra/src/main/resources/scalate/allMaterializedViews.ssp
@@ -18,7 +18,7 @@
--%>
#import(org.apache.zeppelin.cassandra.MetaDataHierarchy._)
#import(java.util.UUID)
-<%@ val allMVs: Map[(UUID,String),List[MaterializedViewSummary]] %>
+<%@ val allMVs: Map[(UUID,String),Seq[MaterializedViewSummary]] %>
<div class="container">
<div class="row">
diff --git a/cassandra/src/main/resources/scalate/allTables.ssp b/cassandra/src/main/resources/scalate/allTables.ssp
index 810881b..0464325 100644
--- a/cassandra/src/main/resources/scalate/allTables.ssp
+++ b/cassandra/src/main/resources/scalate/allTables.ssp
@@ -18,7 +18,7 @@
--%>
#import(org.apache.zeppelin.cassandra.MetaDataHierarchy._)
#import(java.util.UUID)
-<%@ val allTables: Map[(UUID,String),List[String]] %>
+<%@ val allTables: Map[(UUID,String),Seq[String]] %>
<div class="container">
<div class="row">
diff --git a/cassandra/src/main/resources/scalate/allUDTs.ssp b/cassandra/src/main/resources/scalate/allUDTs.ssp
index 559ef41..0fa40ee 100644
--- a/cassandra/src/main/resources/scalate/allUDTs.ssp
+++ b/cassandra/src/main/resources/scalate/allUDTs.ssp
@@ -18,7 +18,7 @@
--%>
#import(org.apache.zeppelin.cassandra.MetaDataHierarchy._)
#import(java.util.UUID)
-<%@ val allUDTs: Map[(UUID,String),List[String]] %>
+<%@ val allUDTs: Map[(UUID,String),Seq[String]] %>
<div class="container">
<div class="row">
diff --git a/cassandra/src/main/resources/scalate/helpMenu.ssp b/cassandra/src/main/resources/scalate/helpMenu.ssp
index c61a68f..c40c81a 100644
--- a/cassandra/src/main/resources/scalate/helpMenu.ssp
+++ b/cassandra/src/main/resources/scalate/helpMenu.ssp
@@ -19,17 +19,17 @@
#import(java.util.UUID)
-#import(com.datastax.driver.core.utils.UUIDs)
-
-<%@ val basicCommandsId: UUID = UUIDs.random() %>
-<%@ val schemaDiscoveryId: UUID = UUIDs.random() %>
-<%@ val queryParamsId: UUID = UUIDs.random() %>
-<%@ val preparedStatementsId: UUID = UUIDs.random() %>
-<%@ val dynamicFormsId: UUID = UUIDs.random() %>
-<%@ val configurationId: UUID = UUIDs.random() %>
-<%@ val sharedStatesId: UUID = UUIDs.random() %>
-<%@ val changelogId: UUID = UUIDs.random() %>
-<%@ val contactsId: UUID = UUIDs.random() %>
+#import(com.datastax.oss.driver.api.core.uuid.Uuids)
+
+<%@ val basicCommandsId: UUID = Uuids.random() %>
+<%@ val schemaDiscoveryId: UUID = Uuids.random() %>
+<%@ val queryParamsId: UUID = Uuids.random() %>
+<%@ val preparedStatementsId: UUID = Uuids.random() %>
+<%@ val dynamicFormsId: UUID = Uuids.random() %>
+<%@ val configurationId: UUID = Uuids.random() %>
+<%@ val sharedStatesId: UUID = Uuids.random() %>
+<%@ val changelogId: UUID = Uuids.random() %>
+<%@ val contactsId: UUID = Uuids.random() %>
<br/>
<br/>
@@ -463,11 +463,6 @@
</td>
</tr>
<tr>
- <td>Retry Policy</td>
- <td><strong>@retryPolicy=<em>value</em></strong></td>
- <td>Apply the given retry policy to all queries in the paragraph</td>
- </tr>
- <tr>
<td>Fetch Size</td>
<td><strong>@fetchSize=<em>int value</em></strong></td>
<td>Apply the given fetch size to all queries in the paragraph</td>
@@ -507,15 +502,6 @@
<td>Any long value</td>
</tr>
<tr>
- <td>Retry Policy</td>
- <td>
- <strong>
- DEFAULT, DOWNGRADING_CONSISTENCY, FALLTHROUGH, LOGGING_DEFAULT,
- LOGGING_DOWNGRADING, LOGGING_FALLTHROUGH
- </strong>
- </td>
- </tr>
- <tr>
<td>Fetch Size</td>
<td>Any integer value</td>
</tr>
diff --git a/cassandra/src/main/resources/scalate/materializedViewDetails.ssp b/cassandra/src/main/resources/scalate/materializedViewDetails.ssp
index 459b5bc8..182776a 100644
--- a/cassandra/src/main/resources/scalate/materializedViewDetails.ssp
+++ b/cassandra/src/main/resources/scalate/materializedViewDetails.ssp
@@ -52,7 +52,7 @@
<i class="glyphicon glyphicon-fullscreen" title="Partition Key"/>
</td>
<td class="col-md-4">${column.name}</td>
- <td class="col-md-4">${column.dataType}</td>
+ <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
</tr>
#case(StaticColumn)
<tr class="warning">
@@ -60,7 +60,7 @@
<i class="glyphicon glyphicon-pushpin" title="Static Column"/>
</td>
<td class="col-md-4">${column.name}</td>
- <td class="col-md-4">${column.dataType}</td>
+ <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
</tr>
#case(ClusteringColumn(ASC))
<tr class="success">
@@ -70,7 +70,7 @@
<i class="glyphicon glyphicon-sort-by-attributes" title="Sort ASC"/>
</td>
<td class="col-md-4">${column.name}</td>
- <td class="col-md-4">${column.dataType}</td>
+ <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
</tr>
#case(ClusteringColumn(DESC))
<tr class="success">
@@ -80,13 +80,13 @@
<i class="glyphicon glyphicon-sort-by-attributes-alt" title="Sort DESC"/>
</td>
<td class="col-md-4">${column.name}</td>
- <td class="col-md-4">${column.dataType}</td>
+ <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
</tr>
#otherwise
<tr>
<td class="col-md-4"></td>
<td class="col-md-4">${column.name}</td>
- <td class="col-md-4">${column.dataType}</td>
+ <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
</tr>
#end
#end
diff --git a/cassandra/src/main/resources/scalate/noResultWithExecutionInfo.ssp b/cassandra/src/main/resources/scalate/noResultWithExecutionInfo.ssp
index 1f709a4..465558a 100644
--- a/cassandra/src/main/resources/scalate/noResultWithExecutionInfo.ssp
+++ b/cassandra/src/main/resources/scalate/noResultWithExecutionInfo.ssp
@@ -17,7 +17,6 @@
*/
--%>
<%@ val query: String%>
-<%@ val consistency: String%>
<%@ val triedHosts: String%>
<%@ val queriedHosts: String%>
<%@ val schemaInAgreement: String%>
@@ -43,10 +42,6 @@
<td>${query}</td>
</tr>
<tr>
- <td>Achieved Consistency</td>
- <td>${consistency}</td>
- </tr>
- <tr>
<td>Tried Hosts</td>
<td>${triedHosts}</td>
</tr>
@@ -55,7 +50,7 @@
<td>${queriedHosts}</td>
</tr>
<tr>
- <td>Schema In Agreement</td>
+ <td>Schema in Agreement</td>
<td>${schemaInAgreement}</td>
</tr>
</tbody>
diff --git a/cassandra/src/main/resources/scalate/tableDetails.ssp b/cassandra/src/main/resources/scalate/tableDetails.ssp
index 41e4517..0938c8e 100644
--- a/cassandra/src/main/resources/scalate/tableDetails.ssp
+++ b/cassandra/src/main/resources/scalate/tableDetails.ssp
@@ -44,7 +44,7 @@
<i class="glyphicon glyphicon-fullscreen" title="Partition Key"/>
</td>
<td class="col-md-4">${column.name}</td>
- <td class="col-md-4">${column.dataType}</td>
+ <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
</tr>
#case(StaticColumn)
<tr class="warning">
@@ -52,7 +52,7 @@
<i class="glyphicon glyphicon-pushpin" title="Static Column"/>
</td>
<td class="col-md-4">${column.name}</td>
- <td class="col-md-4">${column.dataType}</td>
+ <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
</tr>
#case(ClusteringColumn(ASC))
<tr class="success">
@@ -62,7 +62,7 @@
<i class="glyphicon glyphicon-sort-by-attributes" title="Sort ASC"/>
</td>
<td class="col-md-4">${column.name}</td>
- <td class="col-md-4">${column.dataType}</td>
+ <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
</tr>
#case(ClusteringColumn(DESC))
<tr class="success">
@@ -72,13 +72,13 @@
<i class="glyphicon glyphicon-sort-by-attributes-alt" title="Sort DESC"/>
</td>
<td class="col-md-4">${column.name}</td>
- <td class="col-md-4">${column.dataType}</td>
+ <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
</tr>
#otherwise
<tr>
<td class="col-md-4"></td>
<td class="col-md-4">${column.name}</td>
- <td class="col-md-4">${column.dataType}</td>
+ <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
</tr>
#end
#end
diff --git a/cassandra/src/main/resources/scalate/udtDetails.ssp b/cassandra/src/main/resources/scalate/udtDetails.ssp
index 28858f2..804033c 100644
--- a/cassandra/src/main/resources/scalate/udtDetails.ssp
+++ b/cassandra/src/main/resources/scalate/udtDetails.ssp
@@ -39,7 +39,7 @@
<tr>
<td class="col-md-6">${column.name}</td>
- <td class="col-md-6">${column.dataType}</td>
+ <td class="col-md-6">${column.dataType.asCql(true,true)}</td>
</tr>
#end
diff --git a/cassandra/src/main/scala/com/datastax/driver/core/TableMetadataWrapper.scala b/cassandra/src/main/scala/com/datastax/driver/core/TableMetadataWrapper.scala
index feddbe1..489b6fa 100644
--- a/cassandra/src/main/scala/com/datastax/driver/core/TableMetadataWrapper.scala
+++ b/cassandra/src/main/scala/com/datastax/driver/core/TableMetadataWrapper.scala
@@ -16,8 +16,10 @@
*/
package com.datastax.driver.core
+import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata
+
case class TableMetadataWrapper(val meta: TableMetadata) {
def exportTableOnlyAsString(): String = {
- meta.asCQLQuery(true)
+ meta.describe(true)
}
}
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/BoundValuesParser.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/BoundValuesParser.scala
index 3268650..5ebc507 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/BoundValuesParser.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/BoundValuesParser.scala
@@ -16,18 +16,15 @@
*/
package org.apache.zeppelin.cassandra
-import java.text.SimpleDateFormat
-import java.util.{Date}
-
+import scala.util.matching.Regex
import scala.util.parsing.combinator._
/**
* Parser of bound values passed into @bind parameters
*/
class BoundValuesParser extends RegexParsers with JavaTokenParsers {
-
- val STANDARD_DATE_PATTERN = """(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})""".r
- val ACCURATE_DATE_PATTERN = """(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})""".r
+ val STANDARD_DATE_PATTERN: Regex = """(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})""".r
+ val ACCURATE_DATE_PATTERN: Regex = """(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})""".r
def value : Parser[String] = "null" | "true" | "false" | zeppelinVariable |
map | list | set | tuple| udt |
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/DisplaySystem.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/DisplaySystem.scala
index 135d4cd..382bd46 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/DisplaySystem.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/DisplaySystem.scala
@@ -18,13 +18,19 @@ package org.apache.zeppelin.cassandra
import java.util.UUID
-import com.datastax.driver.core.utils.UUIDs
import org.apache.zeppelin.cassandra.MetaDataHierarchy._
import org.fusesource.scalate.TemplateEngine
import scala.collection.JavaConverters._
-
+import scala.compat.java8.OptionConverters._
import com.datastax.driver.core._
+import com.datastax.oss.driver.api.core.CqlIdentifier
+import com.datastax.oss.driver.api.core.`type`.UserDefinedType
+import com.datastax.oss.driver.api.core.`type`.codec.registry.CodecRegistry
+import com.datastax.oss.driver.api.core.cql.ExecutionInfo
+import com.datastax.oss.driver.api.core.metadata.{Metadata, Node}
+import com.datastax.oss.driver.api.core.metadata.schema.{AggregateMetadata, ColumnMetadata, FunctionMetadata, FunctionSignature, IndexMetadata, KeyspaceMetadata, RelationMetadata, TableMetadata, ViewMetadata}
+import com.datastax.oss.driver.api.core.uuid.Uuids
import scala.collection.immutable.ListMap
@@ -61,7 +67,14 @@ object DisplaySystem {
val CLUSTER_CONTENT_TEMPLATE = "scalate/clusterContent.ssp"
val KEYSPACE_CONTENT_TEMPLATE = "scalate/keyspaceContent.ssp"
-
+ val DEFAULT_CLUSTER_NAME = "Test Cluster"
+ def clusterName(meta: Metadata): String = {
+ if (meta != null) {
+ meta.getClusterName.orElse(DEFAULT_CLUSTER_NAME)
+ } else {
+ DEFAULT_CLUSTER_NAME
+ }
+ }
object TableDisplay {
@@ -70,60 +83,61 @@ object DisplaySystem {
}
protected[DisplaySystem] def formatWithoutMenu(meta: TableMetadata, withCaption: Boolean): String = {
- val tableName: String = meta.getName
+ val tableName: String = meta.getName.asCql(true)
val columnsDetails = MetaDataConverter.Table.tableMetaToColumnDetails(meta)
val indicesDetails = MetaDataConverter.Table.tableMetaToIndexDetails(meta)
val indicesAsCQL = indicesDetails.map(_.asCQL).mkString("\n")
engine.layout(TABLE_DETAILS_TEMPLATE,
- Map[String, Any]("tableDetails" -> TableDetails(tableName, columnsDetails, indicesDetails, TableMetadataWrapper(meta).exportTableOnlyAsString(), indicesAsCQL), "withCaption" -> withCaption))
+ Map[String, Any]("tableDetails" -> TableDetails(tableName, columnsDetails, indicesDetails,
+ TableMetadataWrapper(meta).exportTableOnlyAsString(), indicesAsCQL), "withCaption" -> withCaption))
}
}
object UDTDisplay {
- def format(statement: String, userType: UserType, withCaption: Boolean): String = {
+ def format(statement: String, userType: UserDefinedType, withCaption: Boolean): String = {
MenuDisplay.formatMenu(statement) ++ formatWithoutMenu(userType, withCaption)
}
- protected[DisplaySystem] def formatWithoutMenu(userType: UserType, withCaption: Boolean): String = {
- val udtName: String = userType.getTypeName
+ protected[DisplaySystem] def formatWithoutMenu(userType: UserDefinedType, withCaption: Boolean): String = {
+ val udtName: String = userType.getName.asCql(true)
val columnsDetails = MetaDataConverter.UDT.userTypeToColumnDetails(userType)
engine.layout(UDT_DETAILS_TEMPLATE,
- Map[String, Any]("udtDetails" -> UDTDetails(udtName,columnsDetails,userType.exportAsString()), "withCaption" -> withCaption))
+ Map[String, Any]("udtDetails" -> UDTDetails(udtName, columnsDetails, userType.describe(true)), "withCaption" -> withCaption))
}
}
object FunctionDisplay {
- def format(statement: String, functions: List[FunctionMetadata], withCaption: Boolean): String = {
+ def format(statement: String, functions: Seq[FunctionMetadata], withCaption: Boolean): String = {
MenuDisplay.formatMenu(statement) ++ formatWithoutMenu(functions, withCaption)
}
- protected[DisplaySystem] def formatWithoutMenu(functions: List[FunctionMetadata], withCaption: Boolean): String = {
- val functionDetails: List[FunctionDetails] = functions.map(MetaDataConverter.functionMetaToFunctionDetails(_))
+ protected[DisplaySystem] def formatWithoutMenu(functions: Seq[FunctionMetadata], withCaption: Boolean): String = {
+ val functionDetails: Seq[FunctionDetails] = functions.map(MetaDataConverter.functionMetaToFunctionDetails)
engine.layout(FUNCTION_DETAILS_TEMPLATE,
Map[String, Any]("sameNameFunctionDetails" -> SameNameFunctionDetails(functionDetails), "withCaption" -> withCaption))
}
}
object AggregateDisplay {
- def format(statement: String, aggregates: List[AggregateMetadata], withCaption: Boolean, codecRegistry: CodecRegistry): String = {
+ def format(statement: String, aggregates: Seq[AggregateMetadata], withCaption: Boolean, codecRegistry: CodecRegistry): String = {
MenuDisplay.formatMenu(statement) ++ formatWithoutMenu(aggregates, withCaption, codecRegistry)
}
- protected[DisplaySystem] def formatWithoutMenu(aggregates: List[AggregateMetadata], withCaption: Boolean, codecRegistry: CodecRegistry): String = {
- val aggDetails: List[AggregateDetails] = aggregates.map(agg => MetaDataConverter.aggregateMetaToAggregateDetails(codecRegistry, agg))
+ protected[DisplaySystem] def formatWithoutMenu(aggregates: Seq[AggregateMetadata], withCaption: Boolean, codecRegistry: CodecRegistry): String = {
+ val aggDetails: Seq[AggregateDetails] = aggregates.map(agg => MetaDataConverter.aggregateMetaToAggregateDetails(codecRegistry, agg))
engine.layout(AGGREGATE_DETAILS_TEMPLATE,
Map[String, Any]("sameNameAggregateDetails" -> SameNameAggregateDetails(aggDetails), "withCaption" -> withCaption))
}
}
object MaterializedViewDisplay {
- def format(statement: String, mv: MaterializedViewMetadata, withCaption: Boolean): String = {
+ def format(statement: String, mv: ViewMetadata, withCaption: Boolean): String = {
MenuDisplay.formatMenu(statement) ++ formatWithoutMenu(mv, withCaption)
}
- protected[DisplaySystem] def formatWithoutMenu(mv: MaterializedViewMetadata, withCaption: Boolean): String = {
+ protected[DisplaySystem] def formatWithoutMenu(mv: ViewMetadata, withCaption: Boolean): String = {
val mvDetails = MetaDataConverter.mvMetaToMaterializedViewDetails(mv)
engine.layout(MATERIALIZED_VIEW_DETAILS_TEMPLATE,
Map[String, Any]("mvDetails" -> mvDetails, "withCaption" -> withCaption))
@@ -133,44 +147,49 @@ object DisplaySystem {
object KeyspaceDisplay {
private def formatCQLQuery(cql: String): String = {
- cql.replaceAll(""" WITH REPLICATION = \{"""," WITH REPLICATION = \\{")
- .replaceAll("('[^']+'\\s*:\\s+'[^']+',?)","\n\t$1")
- .replaceAll(""" \} AND DURABLE_WRITES = """," \\}\nAND DURABLE_WRITES = ")
+ cql.replaceAll(""" WITH REPLICATION = \{""", " WITH REPLICATION = \\{")
+ .replaceAll("('[^']+'\\s*:\\s+'[^']+',?)", "\n\t$1")
+ .replaceAll(""" \} AND DURABLE_WRITES = """, " \\}\nAND DURABLE_WRITES = ")
}
protected[cassandra] def formatKeyspaceOnly(meta: KeyspaceMetadata, withCaption: Boolean): String = {
- val ksDetails = KeyspaceDetails(meta.getName,
+ val ksDetails = KeyspaceDetails(meta.getName.asCql(true),
meta.getReplication.asScala.toMap,
meta.isDurableWrites,
- formatCQLQuery(meta.asCQLQuery()))
+ formatCQLQuery(meta.describe(true)))
engine.layout(KEYSPACE_DETAILS_TEMPLATE,
Map[String, Any]("ksDetails" -> ksDetails, "withCaption" -> withCaption))
}
def formatKeyspaceContent(statement: String, meta: KeyspaceMetadata, codecRegistry: CodecRegistry): String = {
- val ksName: String = meta.getName
- val ksDetails = formatKeyspaceOnly(meta, true)
-
- val tableDetails: List[(UUID, String, String)] = meta.getTables.asScala.toList
- .sortBy(_.getName)
- .map(table => (UUIDs.timeBased(), table.getName, TableDisplay.formatWithoutMenu(table, false)))
-
- val viewDetails: List[(UUID, String, String)] = meta.getMaterializedViews.asScala.toList
- .sortBy(_.getName)
- .map(view => (UUIDs.timeBased(), view.getName, MaterializedViewDisplay.formatWithoutMenu(view, false)))
-
- val udtDetails: List[(UUID, String, String)] = meta.getUserTypes.asScala.toList
- .sortBy(_.getTypeName)
- .map(udt => (UUIDs.timeBased(), udt.getTypeName, UDTDisplay.formatWithoutMenu(udt, false)))
-
- val functionDetails: List[(UUID, String, String)] = meta.getFunctions.asScala.toList
- .sortBy(_.getSimpleName)
- .map(function => (UUIDs.timeBased(), function.getSimpleName, FunctionDisplay.formatWithoutMenu(List(function), false)))
-
- val aggregateDetails: List[(UUID, String, String)] = meta.getAggregates.asScala.toList
- .sortBy(_.getSimpleName)
- .map(agg => (UUIDs.timeBased(), agg.getSimpleName, AggregateDisplay.formatWithoutMenu(List(agg), false, codecRegistry)))
+ val ksName: String = meta.getName.asCql(true)
+ val ksDetails = formatKeyspaceOnly(meta, withCaption = true)
+
+ val tableDetails: Seq[(UUID, String, String)] = meta.getTables.asScala.toSeq
+ .sortBy(_._1.asCql(true))
+ .map(t => (Uuids.timeBased(), t._1.asCql(true),
+ TableDisplay.formatWithoutMenu(t._2, withCaption = false)))
+
+ val viewDetails: Seq[(UUID, String, String)] = meta.getViews.asScala.toSeq
+ .sortBy(_._1.asCql(true))
+ .map(v => (Uuids.timeBased(), v._1.asCql(true),
+ MaterializedViewDisplay.formatWithoutMenu(v._2, withCaption = false)))
+
+ val udtDetails: Seq[(UUID, String, String)] = meta.getUserDefinedTypes.asScala.toSeq
+ .sortBy(_._1.asCql(true))
+ .map(udt => (Uuids.timeBased(), udt._1.asCql(true),
+ UDTDisplay.formatWithoutMenu(udt._2, withCaption = false)))
+
+ val functionDetails: Seq[(UUID, String, String)] = meta.getFunctions.asScala.toSeq
+ .sortBy(_._1.getName.asCql(true))
+ .map(f => (Uuids.timeBased(), f._1.getName.asCql(true), FunctionDisplay.formatWithoutMenu(List(f._2),
+ withCaption = false)))
+
+ val aggregateDetails: Seq[(UUID, String, String)] = meta.getAggregates.asScala.toSeq
+ .sortBy(_._1.getName.asCql(true))
+ .map(a => (Uuids.timeBased(), a._1.getName.asCql(true), AggregateDisplay.formatWithoutMenu(List(a._2),
+ withCaption = false, codecRegistry)))
val ksContent: KeyspaceContent = KeyspaceContent(ksName, ksDetails, tableDetails, viewDetails,
udtDetails, functionDetails, aggregateDetails)
@@ -181,25 +200,33 @@ object DisplaySystem {
}
}
+ // cluster is explicitly passed because of the limitations of the driver.
+ // TODO(alex): remove it after driver is fixed
object ClusterDisplay {
- def formatClusterOnly(statement: String, meta: Metadata, withMenu: Boolean = true): String = {
- val clusterDetails: ClusterDetails = ClusterDetails(meta.getClusterName, meta.getPartitioner)
+
+ def formatClusterOnly(statement: String, meta: Metadata,
+ withMenu: Boolean = true): String = {
+ val partitioner: String = if (meta.getTokenMap.isPresent)
+ meta.getTokenMap.get().getPartitionerName
+ else
+ "Unknown partitioner"
+ val clusterDetails: ClusterDetails = ClusterDetails(clusterName(meta), partitioner)
val content: String = engine.layout(CLUSTER_DETAILS_TEMPLATE,
Map[String, Any]("clusterDetails" -> clusterDetails))
- if(withMenu) MenuDisplay.formatMenu(statement) + content else content
+ if (withMenu) MenuDisplay.formatMenu(statement) + content else content
}
def formatClusterContent(statement: String, meta: Metadata): String = {
- val clusterName: String = meta.getClusterName
- val clusterDetails: String = formatClusterOnly(statement, meta, false)
+ val clusterDetails: String = formatClusterOnly(statement, meta, withMenu = false)
- val keyspaceDetails: List[(UUID, String, String)] = meta.getKeyspaces.asScala.toList
- .sortBy(ks => ks.getName)
- .map(ks => (UUIDs.timeBased(), ks.getName, KeyspaceDisplay.formatKeyspaceOnly(ks, false)))
+ val keyspaceDetails: Seq[(UUID, String, String)] = meta.getKeyspaces.asScala.toSeq
+ .sortBy(_._1.asCql(true))
+ .map(ks => (Uuids.timeBased(), ks._1.asCql(true),
+ KeyspaceDisplay.formatKeyspaceOnly(ks._2, withCaption = false)))
- val clusterContent: ClusterContent = ClusterContent(clusterName, clusterDetails, keyspaceDetails)
+ val clusterContent: ClusterContent = ClusterContent(clusterName(meta), clusterDetails, keyspaceDetails)
MenuDisplay.formatMenuForCluster(statement, clusterContent) +
engine.layout(CLUSTER_CONTENT_TEMPLATE,
@@ -207,26 +234,26 @@ object DisplaySystem {
}
def formatAllTables(statement: String, meta: Metadata): String = {
- val ksMetas: List[KeyspaceMetadata] = meta.getKeyspaces.asScala.toList
- .filter(_.getTables.size > 0)
- .sortBy(ks => ks.getName)
- if(ksMetas.isEmpty) {
+ val ksMetas: Seq[KeyspaceMetadata] = meta.getKeyspaces.asScala.toSeq
+ .filter(_._2.getTables.size > 0)
+ .sortBy(_._1.asCql(true))
+ .map(_._2)
+ if (ksMetas.isEmpty) {
NoResultDisplay.formatNoResult
} else {
- val allTables: Map[(UUID, String), List[String]] = ListMap.empty ++
+ val allTables: Map[(UUID, String), Seq[String]] = ListMap.empty ++
ksMetas
.map(ks => {
- ((UUIDs.timeBased(), ks.getName),
- ks.getTables.asScala.toList.map(table => table.getName).sortBy(name => name))
+ ((Uuids.timeBased(), ks.getName.asCql(true)),
+ ks.getTables.asScala.toList.map(_._1.asCql(true)).sorted)
})
- .sortBy{case ((id,name), _) => name}
+ .sortBy { case ((_, name), _) => name }
+ val keyspaceDetails: Seq[(UUID, String, String)] = allTables
+ .keySet.toSeq.sortBy { case (_, ksName) => ksName }
+ .map { case (id, ksName) => (id, ksName, "") }
- val keyspaceDetails: List[(UUID, String, String)] = allTables
- .keySet.toList.sortBy{case(id,ksName) => ksName}
- .map{case(id,ksName) => (id,ksName, "")}
-
- val clusterContent: ClusterContent = ClusterContent(meta.getClusterName, "", keyspaceDetails)
+ val clusterContent: ClusterContent = ClusterContent(clusterName(meta), "", keyspaceDetails)
MenuDisplay.formatMenuForCluster(statement, clusterContent) +
engine.layout(ALL_TABLES_TEMPLATE,
@@ -235,27 +262,27 @@ object DisplaySystem {
}
def formatAllUDTs(statement: String, meta: Metadata): String = {
- val ksMetas: List[KeyspaceMetadata] = meta.getKeyspaces.asScala.toList
- .filter(_.getUserTypes.size > 0)
- .sortBy(ks => ks.getName)
+ val ksMetas: Seq[KeyspaceMetadata] = meta.getKeyspaces.asScala.toSeq
+ .filter(_._2.getUserDefinedTypes.size > 0)
+ .sortBy(_._1.asCql(false))
+ .map(_._2)
- if(ksMetas.isEmpty) {
+ if (ksMetas.isEmpty) {
NoResultDisplay.formatNoResult
} else {
- val allUDTs: Map[(UUID, String), List[String]] = ListMap.empty ++
+ val allUDTs: Map[(UUID, String), Seq[String]] = ListMap.empty ++
ksMetas
- .map(ks => {
- ((UUIDs.timeBased(), ks.getName),
- ks.getUserTypes.asScala.toList.map(udt => udt.getTypeName).sortBy(name => name))
+ .map ( ks => {
+ ((Uuids.timeBased(), ks.getName.asCql(true)),
+ ks.getUserDefinedTypes.asScala.toSeq.map(_._1.asCql(true)).sorted)
})
- .sortBy { case ((id, name), _) => name }
-
+ .sortBy { case ((_, name), _) => name }
val keyspaceDetails: List[(UUID, String, String)] = allUDTs
- .keySet.toList.sortBy { case (id, ksName) => ksName }
+ .keySet.toList.sortBy { case (_, ksName) => ksName }
.map { case (id, ksName) => (id, ksName, "") }
- val clusterContent: ClusterContent = ClusterContent(meta.getClusterName, "", keyspaceDetails)
+ val clusterContent: ClusterContent = ClusterContent(clusterName(meta), "", keyspaceDetails)
MenuDisplay.formatMenuForCluster(statement, clusterContent) +
engine.layout(ALL_UDTS_TEMPLATE,
@@ -264,29 +291,29 @@ object DisplaySystem {
}
def formatAllFunctions(statement: String, meta: Metadata): String = {
- val ksMetas: List[KeyspaceMetadata] = meta.getKeyspaces.asScala.toList
- .filter(_.getFunctions.size > 0)
- .sortBy(ks => ks.getName)
+ val ksMetas: Seq[KeyspaceMetadata] = meta.getKeyspaces.asScala.toSeq
+ .filter(_._2.getFunctions.size > 0)
+ .sortBy(_._1.asCql(true))
+ .map(_._2)
- if(ksMetas.isEmpty) {
+ if (ksMetas.isEmpty) {
NoResultDisplay.formatNoResult
} else {
- val allFunctions: Map[(UUID, String), List[FunctionSummary]] = ListMap.empty ++
+ val allFunctions: Map[(UUID, String), Seq[FunctionSummary]] =
ksMetas
.map(ks => {
- ((UUIDs.timeBased(), ks.getName),
- ks.getFunctions.asScala.toList
- .map(MetaDataConverter.functionMetaToFunctionSummary(_))
+ ((Uuids.timeBased(), ks.getName.asCql(true)),
+ ks.getFunctions.asScala
+ .map { case (sig, meta) => MetaDataConverter.functionMetaToFunctionSummary(sig, meta) }
+ .toSeq
.sortBy(_.name))
- })
- .sortBy { case ((id, name), _) => name }
+ }).toMap
val keyspaceDetails: List[(UUID, String, String)] = allFunctions
- .keySet.toList.sortBy { case (id, ksName) => ksName }
+ .keySet.toList.sortBy { case (_, ksName) => ksName }
.map { case (id, ksName) => (id, ksName, "") }
-
- val clusterContent: ClusterContent = ClusterContent(meta.getClusterName, "", keyspaceDetails)
+ val clusterContent: ClusterContent = ClusterContent(clusterName(meta), "", keyspaceDetails)
MenuDisplay.formatMenuForCluster(statement, clusterContent) +
engine.layout(ALL_FUNCTIONS_TEMPLATE,
@@ -295,29 +322,28 @@ object DisplaySystem {
}
def formatAllAggregates(statement: String, meta: Metadata): String = {
- val ksMetas: List[KeyspaceMetadata] = meta.getKeyspaces.asScala.toList
- .filter(_.getAggregates.size > 0)
- .sortBy(ks => ks.getName)
+ val ksMetas = meta.getKeyspaces.asScala.toList
+ .filter(_._2.getAggregates.size > 0)
+ .sortBy(_._1.asCql(false))
- if(ksMetas.isEmpty) {
+ if (ksMetas.isEmpty) {
NoResultDisplay.formatNoResult
} else {
- val allAggregates: Map[(UUID, String), List[AggregateSummary]] = ListMap.empty ++
+ val allAggregates: Map[(UUID, String), Seq[AggregateSummary]] =
ksMetas
- .map(ks => {
- ((UUIDs.timeBased(), ks.getName),
- ks.getAggregates.asScala.toList
- .map(MetaDataConverter.aggregateMetaToAggregateSummary(_))
+ .map { case (id, ks) =>
+ ((Uuids.timeBased(), id.asCql(true)),
+ ks.getAggregates.asScala
+ .map { case (sig, meta) => MetaDataConverter.aggregateMetaToAggregateSummary(sig, meta) }
+ .toSeq
.sortBy(_.name))
- })
- .sortBy { case ((id, name), _) => name }
+ }.toMap
val keyspaceDetails: List[(UUID, String, String)] = allAggregates
.keySet.toList.sortBy { case (id, ksName) => ksName }
.map { case (id, ksName) => (id, ksName, "") }
-
- val clusterContent: ClusterContent = ClusterContent(meta.getClusterName, "", keyspaceDetails)
+ val clusterContent: ClusterContent = ClusterContent(clusterName(meta), "", keyspaceDetails)
MenuDisplay.formatMenuForCluster(statement, clusterContent) +
engine.layout(ALL_AGGREGATES_TEMPLATE,
@@ -326,29 +352,28 @@ object DisplaySystem {
}
def formatAllMaterializedViews(statement: String, meta: Metadata): String = {
- val ksMetas: List[KeyspaceMetadata] = meta.getKeyspaces.asScala.toList
- .filter(_.getMaterializedViews.size > 0)
- .sortBy(ks => ks.getName)
+ val ksMetas: Seq[KeyspaceMetadata] = meta.getKeyspaces.asScala.toSeq
+ .filter(_._2.getViews.size > 0)
+ .sortBy(_._1.asCql(false))
+ .map(_._2)
- if(ksMetas.isEmpty) {
+ if (ksMetas.isEmpty) {
NoResultDisplay.formatNoResult
} else {
- val allMVs: Map[(UUID, String), List[MaterializedViewSummary]] = ListMap.empty ++
+ val allMVs: Map[(UUID, String), Seq[MaterializedViewSummary]] = ListMap.empty ++
ksMetas
- .map(ks => {
- ((UUIDs.timeBased(), ks.getName),
- ks.getMaterializedViews.asScala.toList
- .map(MetaDataConverter.mvMetaToMaterializedViewSummary(_))
- .sortBy(_.name))
- })
- .sortBy { case ((id, name), _) => name }
-
- val keyspaceDetails: List[(UUID, String, String)] = allMVs
- .keySet.toList.sortBy { case (id, ksName) => ksName }
+ .map( ks =>
+ ((Uuids.timeBased(), ks.getName.asCql(true)),
+ ks.getViews.asScala.values.toSeq
+ .map(MetaDataConverter.mvMetaToMaterializedViewSummary)
+ .sortBy(_.name)))
+ .sortBy { case ((_, name), _) => name }
+
+ val keyspaceDetails: Seq[(UUID, String, String)] = allMVs
+ .keySet.toList.sortBy { case (_, ksName) => ksName }
.map { case (id, ksName) => (id, ksName, "") }
-
- val clusterContent: ClusterContent = ClusterContent(meta.getClusterName, "", keyspaceDetails)
+ val clusterContent: ClusterContent = ClusterContent(clusterName(meta), "", keyspaceDetails)
MenuDisplay.formatMenuForCluster(statement, clusterContent) +
engine.layout(ALL_MATERIALIZED_VIEWS_TEMPLATE,
@@ -368,16 +393,20 @@ object DisplaySystem {
val formatNoResult: String = engine.layout("/scalate/noResult.ssp")
+ def nodeToString(node: Node): String = {
+ node.getEndPoint.toString
+ }
+
def noResultWithExecutionInfo(lastQuery: String, execInfo: ExecutionInfo): String = {
- val consistency = Option(execInfo.getAchievedConsistencyLevel).getOrElse("N/A")
- val queriedHosts = execInfo.getQueriedHost.toString.replaceAll("/","").replaceAll("""\[""","").replaceAll("""\]""","")
- val triedHosts = execInfo.getTriedHosts.toString.replaceAll("/","").replaceAll("""\[""","").replaceAll("""\]""","")
+ val queriedHosts = nodeToString(execInfo.getCoordinator)
+ val triedHosts = (execInfo.getErrors.asScala.map(x => nodeToString(x.getKey))
+ .toSet + queriedHosts).mkString(", ")
val schemaInAgreement = Option(execInfo.isSchemaInAgreement).map(_.toString).getOrElse("N/A")
engine.layout("/scalate/noResultWithExecutionInfo.ssp",
- Map[String,Any]("query" -> lastQuery, "consistency" -> consistency,
- "triedHosts" -> triedHosts, "queriedHosts" -> queriedHosts,
- "schemaInAgreement" -> schemaInAgreement))
+ Map[String, Any]("query" -> lastQuery,
+ "triedHosts" -> triedHosts, "queriedHosts" -> queriedHosts,
+ "schemaInAgreement" -> schemaInAgreement))
}
}
@@ -402,6 +431,7 @@ object DisplaySystem {
formatMenu(statement, dropDownMenu)
}
}
+
}
class ColumnMetaWrapper(val columnMeta: ColumnMetadata) {
@@ -413,7 +443,11 @@ class ColumnMetaWrapper(val columnMeta: ColumnMetadata) {
case _ => false
}
- override def hashCode: Int = columnMeta.getName.hashCode
+ override def hashCode: Int = columnMeta.getName.asCql(true).hashCode
+}
+
+object ColumnMetaWrapper {
+ def apply(columnMeta: ColumnMetadata): ColumnMetaWrapper = new ColumnMetaWrapper(columnMeta)
}
/**
@@ -423,153 +457,156 @@ class ColumnMetaWrapper(val columnMeta: ColumnMetadata) {
*/
object MetaDataConverter {
- type DriverClusteringOrder = com.datastax.driver.core.ClusteringOrder
+ type DriverClusteringOrder = com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder
def functionMetaToFunctionDetails(function: FunctionMetadata): FunctionDetails = {
- new FunctionDetails(function.getKeyspace.getName,
- function.getSimpleName,
- function.getArguments.asScala
- .toMap
- .map{case(paramName, dataType) => paramName + " " + dataType.asFunctionParameterString()}
+ val signature = function.getSignature
+ FunctionDetails(function.getKeyspace.asCql(true),
+ signature.getName.asCql(true),
+ function.getParameterNames.asScala.zip(signature.getParameterTypes.asScala)
+ .map { case (paramName, dataType) => paramName + " " + dataType.asCql(true, true) }
.toList,
function.isCalledOnNullInput,
- function.getReturnType.asFunctionParameterString(),
+ function.getReturnType.asCql(true, true),
function.getLanguage,
function.getBody,
- function.exportAsString())
+ function.describe(true))
}
- def functionMetaToFunctionSummary(function: FunctionMetadata): FunctionSummary = {
- new FunctionSummary(function.getKeyspace.getName,
- function.getSimpleName,
- function.getArguments.asScala.toMap
- .mapValues(dataType => dataType.asFunctionParameterString())
- .values.toList,
- function.getReturnType.asFunctionParameterString()
+ def functionMetaToFunctionSummary(signature: FunctionSignature, function: FunctionMetadata): FunctionSummary = {
+ val signature = function.getSignature
+ FunctionSummary(function.getKeyspace.asCql(true),
+ signature.getName.asCql(true),
+ function.getParameterNames.asScala.zip(signature.getParameterTypes.asScala)
+ .map { case (paramName, dataType) => paramName + " " + dataType.asCql(true, true) }
+ .toList,
+ function.getReturnType.asCql(true, true)
)
}
def aggregateMetaToAggregateDetails(codecRegistry: CodecRegistry, aggregate: AggregateMetadata): AggregateDetails = {
- val sFunc: FunctionSummary = functionMetaToFunctionSummary(aggregate.getStateFunc)
- val finalFunc: Option[String] = Option(aggregate.getFinalFunc).map(func => {
- val finalFunction = functionMetaToFunctionSummary(func)
- finalFunction.name + finalFunction.arguments.mkString("(", ", ", ")")
- })
val sType = aggregate.getStateType
- val initCond: Option[String] = Option(aggregate.getInitCond).map(codecRegistry.codecFor(sType).format(_))
- val returnType: String = Option(aggregate.getFinalFunc) match {
- case Some(finalFunc) => functionMetaToFunctionSummary(finalFunc).returnType
- case None => sFunc.returnType
- }
-
- new AggregateDetails(aggregate.getKeyspace.getName,
- aggregate.getSimpleName,
- aggregate.getArgumentTypes.asScala.toList.map(_.asFunctionParameterString()),
- sFunc.name + sFunc.arguments.mkString("(",", ", ")"),
- sType.asFunctionParameterString(),
+ val initCond: Option[String] = Option(aggregate.getInitCond)
+ .map(value => codecRegistry.codecFor(sType).format(value.get))
+ val returnType = aggregate.getReturnType
+ val finalFunc: Option[String] = aggregate.getFinalFuncSignature.asScala
+ .map(x => x.getName.asCql(true) +
+ x.getParameterTypes.asScala.map(_.asCql(true, true)).mkString("(", ", ", ")"))
+
+ val sFunc = aggregate.getStateFuncSignature
+ val sFuncString = sFunc.getName.asCql(true) + sFunc.getParameterTypes.asScala
+ .map(_.asCql(true, true)).mkString("(", ", ", ")")
+
+ AggregateDetails(aggregate.getKeyspace.asCql(true),
+ aggregate.getSignature.getName.asCql(true),
+ aggregate.getSignature.getParameterTypes.asScala.map(_.asCql(true, true)),
+ sFuncString,
+ sType.asCql(true, true),
finalFunc,
initCond,
- returnType,
- aggregate.exportAsString())
+ returnType.asCql(true, true),
+ aggregate.describe(true)
+ )
}
- def aggregateMetaToAggregateSummary(aggregate: AggregateMetadata): AggregateSummary = {
- val returnType: String = Option(aggregate.getFinalFunc) match {
- case Some(finalFunc) => functionMetaToFunctionSummary(finalFunc).returnType
- case None => aggregate.getStateType.asFunctionParameterString()
- }
+ def aggregateMetaToAggregateSummary(signature: FunctionSignature, aggregate: AggregateMetadata): AggregateSummary = {
+ val returnType: String = aggregate.getReturnType.asCql(true, true)
- new AggregateSummary(aggregate.getKeyspace.getName,
- aggregate.getSimpleName,
- aggregate.getArgumentTypes.asScala.toList.map(_.asFunctionParameterString()),
+ AggregateSummary(aggregate.getKeyspace.asCql(true),
+ signature.getName.asCql(true),
+ signature.getParameterTypes.asScala.map( _.asCql(true, true)),
returnType
)
}
- def mvMetaToMaterializedViewDetails(mv: MaterializedViewMetadata): MaterializedViewDetails = {
- new MaterializedViewDetails(mv.getName, MV.mvMetaToColumnDetails(mv), mv.exportAsString(), mv.getBaseTable.getName)
+ def mvMetaToMaterializedViewDetails(mv: ViewMetadata): MaterializedViewDetails = {
+ MaterializedViewDetails(mv.getName.asCql(true), MV.mvMetaToColumnDetails(mv), mv.describe(true),
+ mv.getBaseTable.asCql(true))
}
- def mvMetaToMaterializedViewSummary(mv: MaterializedViewMetadata): MaterializedViewSummary = {
- new MaterializedViewSummary(mv.getName, mv.getBaseTable.getName)
+ def mvMetaToMaterializedViewSummary(mv: ViewMetadata): MaterializedViewSummary = {
+ MaterializedViewSummary(mv.getName.asCql(true), mv.getBaseTable.asCql(true))
}
trait TableOrView {
- protected def extractNormalColumns(columns: List[ColumnMetaWrapper]): List[ColumnDetails] = {
- columns
- .filter(_.columnMeta.isStatic == false)
- .map(c => new ColumnDetails(c.columnMeta.getName, NormalColumn, c.columnMeta.getType))
+ protected def extractNormalColumns(columns: Seq[ColumnMetaWrapper]): Seq[ColumnDetails] = {
+ columns.filter(_.columnMeta.isStatic == false)
+ .map(c => ColumnDetails(c.columnMeta.getName.asCql(true), NormalColumn, c.columnMeta.getType))
}
- protected def extractStaticColumns(columns: List[ColumnMetaWrapper]): List[ColumnDetails] = {
- columns
- .filter(_.columnMeta.isStatic == true)
- .map(c => new ColumnDetails(c.columnMeta.getName, StaticColumn, c.columnMeta.getType))
+ protected def extractStaticColumns(columns: Seq[ColumnMetaWrapper]): Seq[ColumnDetails] = {
+ columns.filter(_.columnMeta.isStatic == true)
+ .map(c => ColumnDetails(c.columnMeta.getName.asCql(true), StaticColumn, c.columnMeta.getType))
}
- protected def convertClusteringColumns(columns: List[ColumnMetaWrapper], orders: List[DriverClusteringOrder]): List[ColumnDetails] = {
- columns
- .zip(orders)
- .map{case(c,order) => new ColumnDetails(c.columnMeta.getName,
- new ClusteringColumn(OrderConverter.convert(order)),c.columnMeta.getType)}
-
+ protected def convertClusteringColumns(columns: Seq[ColumnMetaWrapper], orders: Seq[DriverClusteringOrder]): Seq[ColumnDetails] = {
+ columns.zip(orders)
+ .map { case (c, order) => ColumnDetails(c.columnMeta.getName.asCql(true),
+ ClusteringColumn(OrderConverter.convert(order)), c.columnMeta.getType)
+ }
}
- protected def convertPartitionKeys(columns: List[ColumnMetaWrapper]): List[ColumnDetails] = {
+ protected def convertPartitionKeys(columns: Seq[ColumnMetaWrapper]): Seq[ColumnDetails] = {
columns
- .map(c => new ColumnDetails(c.columnMeta.getName, PartitionKey, c.columnMeta.getType))
+ .map(c => ColumnDetails(c.columnMeta.getName.asCql(true), PartitionKey, c.columnMeta.getType))
}
- }
- object Table extends TableOrView {
- def tableMetaToColumnDetails(meta: TableMetadata): List[ColumnDetails] = {
- val partitionKeys: List[ColumnMetaWrapper] = meta.getPartitionKey.asScala.toList.map(new ColumnMetaWrapper(_))
- val clusteringColumns: List[ColumnMetaWrapper] = meta.getClusteringColumns.asScala.toList.map(new ColumnMetaWrapper(_))
- val columns: List[ColumnMetaWrapper] = meta.getColumns.asScala.toList.map(new ColumnMetaWrapper(_))
- .diff(partitionKeys).diff(clusteringColumns)
- val clusteringOrders = meta.getClusteringOrder.asScala.toList
-
- convertPartitionKeys(partitionKeys):::
- extractStaticColumns(columns):::
- convertClusteringColumns(clusteringColumns, clusteringOrders):::
+ def relationMetaToColumnDetails(meta: RelationMetadata): Seq[ColumnDetails] = {
+ val partitionKeys: Seq[ColumnMetaWrapper] = meta.getPartitionKey.asScala.map(ColumnMetaWrapper(_))
+
+ // mutable structures are used to keep the order of the columns - it's lost if we're doing .asScala, etc.
+ val clusteringColumnsRaw = meta.getClusteringColumns
+ val clusteringColumns = new Array[ColumnMetaWrapper](clusteringColumnsRaw.size())
+ val clusteringOrders = new Array[DriverClusteringOrder](clusteringColumnsRaw.size())
+ var i = 0
+ for (meta <- clusteringColumnsRaw.keySet().iterator().asScala) {
+ clusteringColumns(i) = ColumnMetaWrapper(meta)
+ clusteringOrders(i) = clusteringColumnsRaw.get(meta)
+ i += 1
+ }
+
+ val primaryKeyNames = meta.getPrimaryKey.asScala.map(_.getName).toSet
+ val columnsRaw = meta.getColumns
+ val columns = new Array[ColumnMetaWrapper](columnsRaw.size() - primaryKeyNames.size)
+ i = 0
+ for (col <- columnsRaw.keySet().iterator().asScala) {
+ if (!primaryKeyNames.contains(col)) {
+ columns(i) = ColumnMetaWrapper(columnsRaw.get(col))
+ i += 1
+ }
+ }
+
+ convertPartitionKeys(partitionKeys) ++
+ extractStaticColumns(columns) ++
+ convertClusteringColumns(clusteringColumns, clusteringOrders) ++
extractNormalColumns(columns)
}
- def tableMetaToIndexDetails(meta: TableMetadata): List[IndexDetails] = {
- meta.getIndexes.asScala.toList
- .map(index => IndexDetails(index.getName, index.getTarget, index.asCQLQuery()))
- .sortBy(index => index.name)
- }
+ }
+ object Table extends TableOrView {
+ def tableMetaToColumnDetails(meta: TableMetadata): Seq[ColumnDetails] = {
+ relationMetaToColumnDetails(meta)
+ }
+ def tableMetaToIndexDetails(meta: TableMetadata): Seq[IndexDetails] = {
+ meta.getIndexes.asScala.map {
+ case (name: CqlIdentifier, index: IndexMetadata) =>
+ IndexDetails(name.asCql(true), index.getTarget, index.describe(true))
+ }.toSeq.sortBy(_.name)
+ }
}
object MV extends TableOrView {
- def mvMetaToColumnDetails(meta: MaterializedViewMetadata): List[ColumnDetails] = {
- val partitionKeys: List[ColumnMetaWrapper] = meta.getPartitionKey.asScala.toList.map(new ColumnMetaWrapper(_))
- val clusteringColumns: List[ColumnMetaWrapper] = meta.getClusteringColumns.asScala.toList.map(new ColumnMetaWrapper(_))
- val columns: List[ColumnMetaWrapper] = meta.getColumns.asScala.toList.map(new ColumnMetaWrapper(_))
- .diff(partitionKeys).diff(clusteringColumns)
- val clusteringOrders = meta.getClusteringOrder.asScala.toList
-
- convertPartitionKeys(partitionKeys):::
- convertClusteringColumns(clusteringColumns, clusteringOrders):::
- extractNormalColumns(columns)
+ def mvMetaToColumnDetails(meta: ViewMetadata): Seq[ColumnDetails] = {
+ relationMetaToColumnDetails(meta)
}
}
object UDT {
- def userTypeToColumnDetails(userType: UserType): List[ColumnDetails] = {
- userType.getFieldNames.asScala.toList
- .map(name => new ColumnDetails(name, NormalColumn, userType.getFieldType(name)))
+ def userTypeToColumnDetails(userType: UserDefinedType): Seq[ColumnDetails] = {
+ userType.getFieldNames.asScala.zip(userType.getFieldTypes.asScala)
+ .map { case (name, typ) => ColumnDetails(name.asCql(true), NormalColumn, typ) }
}
}
-
-
-
}
-
-
-
-
-
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
index 988cdd2..31f5142 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
@@ -18,20 +18,23 @@ package org.apache.zeppelin.cassandra
import java.util.regex.Pattern
-import com.datastax.driver.core._
+import com.datastax.oss.driver.api.core.CqlSession
+import com.datastax.oss.driver.api.core.cql.{BatchStatement, BatchType, BoundStatement, ExecutionInfo, ResultSet, SimpleStatement, Statement}
+import com.datastax.oss.driver.api.core.metadata.Metadata
+import com.datastax.oss.driver.api.core.metadata.schema.{AggregateMetadata, FunctionMetadata, KeyspaceMetadata, ViewMetadata}
import org.apache.zeppelin.cassandra.TextBlockHierarchy._
import org.apache.zeppelin.interpreter.InterpreterException
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
+import scala.compat.java8.OptionConverters._
/**
* Enhance the Java driver session
* with special statements
* to describe schema
*/
-class EnhancedSession(val session: Session) {
-
+class EnhancedSession(val session: CqlSession) {
val clusterDisplay = DisplaySystem.ClusterDisplay
val keyspaceDisplay = DisplaySystem.KeyspaceDisplay
val tableDisplay = DisplaySystem.TableDisplay
@@ -41,7 +44,8 @@ class EnhancedSession(val session: Session) {
val materializedViewDisplay = DisplaySystem.MaterializedViewDisplay
val helpDisplay = DisplaySystem.HelpDisplay
private val noResultDisplay = DisplaySystem.NoResultDisplay
- private val DEFAULT_CHECK_TIME = 200 // half second
+ private val DEFAULT_CHECK_TIME: Int = 200
+ private val MAX_SCHEMA_AGREEMENT_WAIT: Int = 120000 // 120 seconds
private val LOGGER = LoggerFactory.getLogger(classOf[EnhancedSession])
val HTML_MAGIC = "%html \n"
@@ -53,130 +57,132 @@ class EnhancedSession(val session: Session) {
}
private def execute(describeCluster: DescribeClusterCmd): String = {
- val metaData = session.getCluster.getMetadata
+ val metaData = session.getMetadata
HTML_MAGIC + clusterDisplay.formatClusterOnly(describeCluster.statement, metaData)
}
private def execute(describeKeyspaces: DescribeKeyspacesCmd): String = {
- val metaData = session.getCluster.getMetadata
+ val metaData = session.getMetadata
HTML_MAGIC + clusterDisplay.formatClusterContent(describeKeyspaces.statement, metaData)
}
private def execute(describeTables: DescribeTablesCmd): String = {
- val metadata: Metadata = session.getCluster.getMetadata
+ val metadata = session.getMetadata
HTML_MAGIC + clusterDisplay.formatAllTables(describeTables.statement,metadata)
}
private def execute(describeKeyspace: DescribeKeyspaceCmd): String = {
val keyspace: String = describeKeyspace.keyspace
- val metadata: Option[KeyspaceMetadata] = Option(session.getCluster.getMetadata.getKeyspace(keyspace))
+ val metadata: Option[KeyspaceMetadata] = session.getMetadata.getKeyspace(keyspace).asScala
metadata match {
case Some(ksMeta) => HTML_MAGIC + keyspaceDisplay.formatKeyspaceContent(describeKeyspace.statement, ksMeta,
- session.getCluster.getConfiguration.getCodecRegistry)
+ session.getContext.getCodecRegistry)
case None => throw new InterpreterException(s"Cannot find keyspace $keyspace")
}
}
+ private def getKeySpace(session: CqlSession): String = {
+ session.getKeyspace.asScala.map(_.asCql(true)).getOrElse("system")
+ }
+
private def execute(describeTable: DescribeTableCmd): String = {
- val metaData = session.getCluster.getMetadata
+ val metaData = session.getMetadata
val tableName: String = describeTable.table
- val keyspace: String = describeTable.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
+ val keyspace: String = describeTable.keyspace.getOrElse(getKeySpace(session))
- Option(metaData.getKeyspace(keyspace)).flatMap(ks => Option(ks.getTable(tableName))) match {
- case Some(tableMeta) => HTML_MAGIC + tableDisplay.format(describeTable.statement, tableMeta, true)
+ metaData.getKeyspace(keyspace).asScala.flatMap(ks => ks.getTable(tableName).asScala) match {
+ case Some(tableMeta) => HTML_MAGIC + tableDisplay.format(describeTable.statement, tableMeta, withCaption = true)
case None => throw new InterpreterException(s"Cannot find table $keyspace.$tableName")
}
}
private def execute(describeUDT: DescribeTypeCmd): String = {
- val metaData = session.getCluster.getMetadata
- val keyspace: String = describeUDT.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
+ val metaData = session.getMetadata
+ val keyspace: String = describeUDT.keyspace.getOrElse(getKeySpace(session))
val udtName: String = describeUDT.udtName
- Option(metaData.getKeyspace(keyspace)).flatMap(ks => Option(ks.getUserType(udtName))) match {
- case Some(userType) => HTML_MAGIC + udtDisplay.format(describeUDT.statement, userType, true)
+ metaData.getKeyspace(keyspace).asScala.flatMap(ks => ks.getUserDefinedType(udtName).asScala) match {
+ case Some(userType) => HTML_MAGIC + udtDisplay.format(describeUDT.statement, userType, withCaption = true)
case None => throw new InterpreterException(s"Cannot find type $keyspace.$udtName")
}
}
private def execute(describeUDTs: DescribeTypesCmd): String = {
- val metadata: Metadata = session.getCluster.getMetadata
+ val metadata: Metadata = session.getMetadata
HTML_MAGIC + clusterDisplay.formatAllUDTs(describeUDTs.statement, metadata)
}
private def execute(describeFunction: DescribeFunctionCmd): String = {
- val metaData = session.getCluster.getMetadata
- val keyspaceName: String = describeFunction.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
- val functionName: String = describeFunction.function;
+ val metaData = session.getMetadata
+ val keyspaceName: String = describeFunction.keyspace.getOrElse(getKeySpace(session))
+ val functionName: String = describeFunction.function
- Option(metaData.getKeyspace(keyspaceName)) match {
- case Some(keyspace) => {
- val functionMetas: List[FunctionMetadata] = keyspace.getFunctions.asScala.toList
- .filter(func => func.getSimpleName.toLowerCase == functionName.toLowerCase)
+ metaData.getKeyspace(keyspaceName).asScala match {
+ case Some(keyspace) =>
+ val functionMetas: Seq[FunctionMetadata] = keyspace.getFunctions.asScala.toSeq
+ .filter { case (sig, _) => sig.getName.asCql(true).toLowerCase == functionName.toLowerCase}
+ .map(_._2)
if(functionMetas.isEmpty) {
- throw new InterpreterException(s"Cannot find function ${keyspaceName}.$functionName")
- } else {
- HTML_MAGIC + functionDisplay.format(describeFunction.statement, functionMetas, true)
+ throw new InterpreterException(s"Cannot find function $keyspaceName.$functionName")
}
- }
- case None => throw new InterpreterException(s"Cannot find function ${keyspaceName}.$functionName")
+ HTML_MAGIC + functionDisplay.format(describeFunction.statement, functionMetas, withCaption = true)
+
+ case None =>
+ throw new InterpreterException(s"Cannot find function $keyspaceName.$functionName")
}
}
private def execute(describeFunctions: DescribeFunctionsCmd): String = {
- val metadata: Metadata = session.getCluster.getMetadata
+ val metadata: Metadata = session.getMetadata
HTML_MAGIC + clusterDisplay.formatAllFunctions(describeFunctions.statement, metadata)
}
private def execute(describeAggregate: DescribeAggregateCmd): String = {
- val metaData = session.getCluster.getMetadata
- val keyspaceName: String = describeAggregate.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
- val aggregateName: String = describeAggregate.aggregate;
+ val metaData = session.getMetadata
+ val keyspaceName: String = describeAggregate.keyspace.getOrElse(getKeySpace(session))
+ val aggregateName: String = describeAggregate.aggregate
- Option(metaData.getKeyspace(keyspaceName)) match {
- case Some(keyspace) => {
- val aggMetas: List[AggregateMetadata] = keyspace.getAggregates.asScala.toList
- .filter(agg => agg.getSimpleName.toLowerCase == aggregateName.toLowerCase)
+ metaData.getKeyspace(keyspaceName).asScala match {
+ case Some(keyspace) =>
+ val aggMetas: Seq[AggregateMetadata] = keyspace.getAggregates.asScala.toSeq
+ .filter { case (sig, _) => sig.getName.asCql(true).toLowerCase == aggregateName.toLowerCase }
+ .map(_._2)
if(aggMetas.isEmpty) {
- throw new InterpreterException(s"Cannot find aggregate ${keyspaceName}.$aggregateName")
- } else {
- HTML_MAGIC + aggregateDisplay.format(describeAggregate.statement, aggMetas, true,
- session
- .getCluster
- .getConfiguration
- .getCodecRegistry)
+ throw new InterpreterException(s"Cannot find aggregate $keyspaceName.$aggregateName")
}
- }
- case None => throw new InterpreterException(s"Cannot find aggregate ${keyspaceName}.$aggregateName")
+ HTML_MAGIC + aggregateDisplay.format(describeAggregate.statement, aggMetas, withCaption = true,
+ session.getContext.getCodecRegistry)
+
+ case None => throw new InterpreterException(s"Cannot find aggregate $keyspaceName.$aggregateName")
}
}
private def execute(describeAggregates: DescribeAggregatesCmd): String = {
- val metadata: Metadata = session.getCluster.getMetadata
+ val metadata: Metadata = session.getMetadata
HTML_MAGIC + clusterDisplay.formatAllAggregates(describeAggregates.statement, metadata)
}
private def execute(describeMV: DescribeMaterializedViewCmd): String = {
- val metaData = session.getCluster.getMetadata
- val keyspaceName: String = describeMV.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
+ val metaData = session.getMetadata
+ val keyspaceName: String = describeMV.keyspace.getOrElse(getKeySpace(session))
val viewName: String = describeMV.view
- Option(metaData.getKeyspace(keyspaceName)) match {
- case Some(keyspace) => {
- val viewMeta: Option[MaterializedViewMetadata] = Option(keyspace.getMaterializedView(viewName))
+ metaData.getKeyspace(keyspaceName).asScala match {
+ case Some(keyspace) =>
+ val viewMeta: Option[ViewMetadata] = keyspace.getView(viewName).asScala
viewMeta match {
- case Some(vMeta) => HTML_MAGIC + materializedViewDisplay.format(describeMV.statement, vMeta, true)
- case None => throw new InterpreterException(s"Cannot find materialized view ${keyspaceName}.$viewName")
+ case Some(vMeta) => HTML_MAGIC + materializedViewDisplay.format(describeMV.statement, vMeta, withCaption = true)
+ case None => throw new InterpreterException(s"Cannot find materialized view $keyspaceName.$viewName")
}
- }
- case None => throw new InterpreterException(s"Cannot find materialized view ${keyspaceName}.$viewName")
+
+ case None => throw new InterpreterException(s"Cannot find materialized view $keyspaceName.$viewName")
}
}
private def execute(describeMVs: DescribeMaterializedViewsCmd): String = {
- val metadata: Metadata = session.getCluster.getMetadata
+ val metadata: Metadata = session.getMetadata
HTML_MAGIC + clusterDisplay.formatAllMaterializedViews(describeMVs.statement, metadata)
}
@@ -184,38 +190,45 @@ class EnhancedSession(val session: Session) {
HTML_MAGIC + helpDisplay.formatHelp()
}
-
- private def execute(st: Statement): Any = {
- val rs = session.execute(st)
+ private def executeStatement[StatementT <: Statement[StatementT]](st: StatementT): Any = {
+ val rs: ResultSet = session.execute(st)
if (EnhancedSession.isDDLStatement(st)) {
if (!rs.getExecutionInfo.isSchemaInAgreement) {
- val metadata = session.getCluster.getMetadata
- while(!metadata.checkSchemaAgreement) {
+ val startTime = System.currentTimeMillis()
+ while(!session.checkSchemaAgreement()) {
LOGGER.info("Schema is still not in agreement, waiting...")
- Thread.sleep(DEFAULT_CHECK_TIME)
+ try {
+ Thread.sleep(DEFAULT_CHECK_TIME)
+ } catch {
+ case x: InterruptedException => None
+ }
+ val sinceStart = (System.currentTimeMillis() - startTime) / 1000
+ if (sinceStart > MAX_SCHEMA_AGREEMENT_WAIT) {
+ throw new RuntimeException(s"Can't achieve schema agreement after $sinceStart seconds")
+ }
}
}
}
rs
}
- def execute(st: Any): Any = {
+ def execute[StatementT <: Statement[StatementT]](st: Any): Any = {
st match {
- case x:DescribeClusterCmd => execute(x)
- case x:DescribeKeyspaceCmd => execute(x)
- case x:DescribeKeyspacesCmd => execute(x)
- case x:DescribeTableCmd => execute(x)
- case x:DescribeTablesCmd => execute(x)
- case x:DescribeTypeCmd => execute(x)
- case x:DescribeTypesCmd => execute(x)
- case x:DescribeFunctionCmd => execute(x)
- case x:DescribeFunctionsCmd => execute(x)
- case x:DescribeAggregateCmd => execute(x)
- case x:DescribeAggregatesCmd => execute(x)
- case x:DescribeMaterializedViewCmd => execute(x)
- case x:DescribeMaterializedViewsCmd => execute(x)
- case x:HelpCmd => execute(x)
- case x:Statement => execute(x)
+ case x: DescribeClusterCmd => execute(x)
+ case x: DescribeKeyspaceCmd => execute(x)
+ case x: DescribeKeyspacesCmd => execute(x)
+ case x: DescribeTableCmd => execute(x)
+ case x: DescribeTablesCmd => execute(x)
+ case x: DescribeTypeCmd => execute(x)
+ case x: DescribeTypesCmd => execute(x)
+ case x: DescribeFunctionCmd => execute(x)
+ case x: DescribeFunctionsCmd => execute(x)
+ case x: DescribeAggregateCmd => execute(x)
+ case x: DescribeAggregatesCmd => execute(x)
+ case x: DescribeMaterializedViewCmd => execute(x)
+ case x: DescribeMaterializedViewsCmd => execute(x)
+ case x: HelpCmd => execute(x)
+ case x: StatementT => executeStatement(x)
case _ => throw new InterpreterException(s"Cannot execute statement '$st' of type ${st.getClass}")
}
}
@@ -228,16 +241,44 @@ object EnhancedSession {
DDL_REGEX.matcher(query.trim).matches()
}
- def isDDLStatement(st: Statement): Boolean = {
+ def isDDLStatement[StatementT <: Statement[StatementT]](st: StatementT): Boolean = {
st match {
- case x:BoundStatement =>
- isDDLStatement(x.preparedStatement.getQueryString)
- case x:BatchStatement =>
- x.getStatements.asScala.seq.exists(isDDLStatement)
- case x:RegularStatement =>
- isDDLStatement(x.getQueryString)
+ case x: BoundStatement =>
+ isDDLStatement(x.getPreparedStatement.getQuery)
+ case x: BatchStatement =>
+ x.iterator().asScala.toSeq.exists {
+ case t: BoundStatement => isDDLStatement(t.getPreparedStatement.getQuery)
+ case t: SimpleStatement => isDDLStatement(t.getQuery)
+ }
+ case x: SimpleStatement =>
+ isDDLStatement(x.getQuery)
case _ => // only should be for StatementWrapper
true
}
}
+
+ def getCqlStatement[StatementT <: Statement[StatementT]](st: StatementT): String = {
+ st match {
+ case x: BoundStatement =>
+ x.getPreparedStatement.getQuery
+ case x: BatchStatement =>
+ val batchType = x.getBatchType
+ val timestamp = x.getQueryTimestamp
+ val timestampStr = if (timestamp == Long.MinValue) "" else " USING TIMESTAMP " + timestamp
+ val batchTypeStr = if (batchType == BatchType.COUNTER) "COUNTER "
+ else if (batchType == BatchType.UNLOGGED) "UNLOGGED "
+ else ""
+
+ "BEGIN " + batchTypeStr + "BATCH" + timestampStr + "\n"
+ x.iterator().asScala.toSeq.map {
+ case t: BoundStatement => t.getPreparedStatement.getQuery
+ case t: SimpleStatement => t.getQuery
+ }.mkString("\n") + "\nAPPLY BATCH;"
+ case x: SimpleStatement =>
+ x.getQuery
+ case _ =>
+ ""
+ }
+ }
+
}
\ No newline at end of file
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
index a871abd..bd37c45 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
@@ -20,24 +20,27 @@ import java.io.{ByteArrayOutputStream, PrintStream}
import java.net.InetAddress
import java.nio.ByteBuffer
import java.text.SimpleDateFormat
+import java.time.format.DateTimeFormatter
+import java.time.{Duration, Instant, LocalDateTime, ZoneOffset}
import java.util
-import java.util.Date
import java.util.concurrent.ConcurrentHashMap
-import com.datastax.driver.core.DataType.Name._
-import com.datastax.driver.core._
-import com.datastax.driver.core.exceptions.DriverException
-import com.datastax.driver.core.policies.{LoggingRetryPolicy, FallthroughRetryPolicy, DowngradingConsistencyRetryPolicy, Policies}
+import com.datastax.oss.driver.api.core.`type`.{DataType, ListType, MapType, SetType, TupleType, UserDefinedType}
+import com.datastax.oss.driver.api.core.`type`.DataTypes._
+import com.datastax.oss.driver.api.core.`type`.codec.TypeCodec
+import com.datastax.oss.driver.api.core.`type`.codec.registry.CodecRegistry
+import com.datastax.oss.driver.api.core.cql.{BatchStatement, BatchType, BatchableStatement, BoundStatement, ExecutionInfo, PreparedStatement, ResultSet, Row, SimpleStatement, Statement}
+import com.datastax.oss.driver.api.core.{ConsistencyLevel, CqlSession, DriverException, ProtocolVersion}
import org.apache.zeppelin.cassandra.TextBlockHierarchy._
-import org.apache.zeppelin.display.AngularObjectRegistry
import org.apache.zeppelin.display.ui.OptionInput.ParamOption
import org.apache.zeppelin.interpreter.InterpreterResult.Code
-import org.apache.zeppelin.interpreter.{InterpreterException, InterpreterResult, InterpreterContext}
-import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterResult}
+import org.slf4j.{Logger, LoggerFactory}
+
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.util.matching.Regex
/**
@@ -46,14 +49,12 @@ import scala.collection.mutable.ArrayBuffer
* @param consistency consistency level
* @param serialConsistency serial consistency level
* @param timestamp timestamp
- * @param retryPolicy retry policy
* @param fetchSize query fetch size
* @param requestTimeOut request time out in millisecs
*/
case class CassandraQueryOptions(consistency: Option[ConsistencyLevel],
serialConsistency:Option[ConsistencyLevel],
timestamp: Option[Long],
- retryPolicy: Option[RetryPolicy],
fetchSize: Option[Int],
requestTimeOut: Option[Int])
@@ -63,27 +64,24 @@ case class CassandraQueryOptions(consistency: Option[ConsistencyLevel],
object InterpreterLogic {
val CHOICES_SEPARATOR : String = """\|"""
- val VARIABLE_PATTERN = """\{\{[^}]+\}\}""".r
- val SIMPLE_VARIABLE_DEFINITION_PATTERN = """\{\{([^=]+)=([^=]+)\}\}""".r
- val MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN = """\{\{([^=]+)=((?:[^=]+\|)+[^|]+)\}\}""".r
+ val VARIABLE_PATTERN: Regex = """\{\{[^}]+\}\}""".r
+ val SIMPLE_VARIABLE_DEFINITION_PATTERN: Regex = """\{\{([^=]+)=([^=]+)\}\}""".r
+ val MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN: Regex = """\{\{([^=]+)=((?:[^=]+\|)+[^|]+)\}\}""".r
val STANDARD_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"
val ACCURATE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"
+ // TODO(alex): add more time formatters, like, ISO...
+ val STANDARD_DATE_FORMATTER: DateTimeFormatter = DateTimeFormatter.ofPattern(STANDARD_DATE_FORMAT)
+ val ACCURATE_DATE_FORMATTER: DateTimeFormatter = DateTimeFormatter.ofPattern(ACCURATE_DATE_FORMAT)
- val defaultRetryPolicy = Policies.defaultRetryPolicy()
- val downgradingConsistencyRetryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE
- val fallThroughRetryPolicy = FallthroughRetryPolicy.INSTANCE
- val loggingDefaultRetryPolicy = new LoggingRetryPolicy(defaultRetryPolicy)
- val loggingDownGradingRetryPolicy = new LoggingRetryPolicy(downgradingConsistencyRetryPolicy)
- val loggingFallThroughRetryPolicy = new LoggingRetryPolicy(fallThroughRetryPolicy)
- val preparedStatements : mutable.Map[String,PreparedStatement] = new ConcurrentHashMap[String,PreparedStatement]().asScala
+ val preparedStatements : mutable.Map[String, PreparedStatement] = new ConcurrentHashMap[String,PreparedStatement]().asScala
- val logger = LoggerFactory.getLogger(classOf[InterpreterLogic])
+ val logger: Logger = LoggerFactory.getLogger(classOf[InterpreterLogic])
val paragraphParser = new ParagraphParser
val boundValuesParser = new BoundValuesParser
-
+
}
/**
@@ -93,18 +91,19 @@ object InterpreterLogic {
*
* @param session java driver session
*/
-class InterpreterLogic(val session: Session) {
+class InterpreterLogic(val session: CqlSession) {
val enhancedSession: EnhancedSession = new EnhancedSession(session)
import InterpreterLogic._
- def interpret(session:Session, stringStatements : String, context: InterpreterContext): InterpreterResult = {
+ def interpret[StatementT <: Statement[StatementT]](session:CqlSession, stringStatements : String,
+ context: InterpreterContext): InterpreterResult = {
logger.info(s"Executing CQL statements : \n\n$stringStatements\n")
try {
- val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersion
+ val protocolVersion = session.getContext.getProtocolVersion
val queries:List[AnyBlock] = parseInput(stringStatements)
@@ -131,61 +130,64 @@ class InterpreterLogic(val session: Session) {
.map(_.getStatement[PrepareStm])
.foreach(statement => {
logger.debug(s"Get or prepare statement '${statement.name}' : ${statement.query}")
- preparedStatements.getOrElseUpdate(statement.name,session.prepare(statement.query))
+ preparedStatements.getOrElseUpdate(statement.name, session.prepare(statement.query))
})
- val statements: List[Any] = queryStatements
+ val statements: Seq[Any] = queryStatements
.filter(st => (st.statementType != PrepareStatementType) && (st.statementType != RemovePrepareStatementType))
- .map{
- case x:SimpleStm => generateSimpleStatement(x, queryOptions, context)
- case x:BatchStm => {
- val builtStatements: List[Statement] = x.statements.map {
- case st:SimpleStm => generateSimpleStatement(st, queryOptions, context)
- case st:BoundStm => generateBoundStatement(session, st, queryOptions, context)
+ .map {
+ case x: SimpleStm =>
+ generateSimpleStatement(x, queryOptions, context)
+
+ case x: BatchStm =>
+ val builtStatements = x.statements.map {
+ case st: SimpleStm => generateSimpleStatement(st, queryOptions, context)
+ case st: BoundStm => generateBoundStatement(session, st, queryOptions, context)
case _ => throw new InterpreterException(s"Unknown statement type")
}
generateBatchStatement(x.batchType, queryOptions, builtStatements)
- }
- case x:BoundStm => generateBoundStatement(session, x, queryOptions, context)
- case x:DescribeCommandStatement => x
- case x:HelpCmd => x
- case x => throw new InterpreterException(s"Unknown statement type : ${x}")
+
+ case x: BoundStm =>
+ generateBoundStatement(session, x, queryOptions, context)
+ case x: DescribeCommandStatement => x
+ case x: HelpCmd => x
+ case x => throw new InterpreterException(s"Unknown statement type : $x")
}
- val results: List[(Any,Any)] = for (statement <- statements) yield (enhancedSession.execute(statement),statement)
+ val results: Seq[(Any,Any)] = for (statement <- statements)
+ yield (enhancedSession.execute(statement),statement)
if (results.nonEmpty) {
results.last match {
- case(res: ResultSet, st: Statement) => buildResponseMessage((res, st), protocolVersion)
+ case(res: ResultSet, st: StatementT) =>
+ buildResponseMessage((res, st), protocolVersion)
case(output: String, _) => new InterpreterResult(Code.SUCCESS, output)
case _ => throw new InterpreterException(s"Cannot parse result type : ${results.last}")
}
-
} else {
new InterpreterResult(Code.SUCCESS, enhancedSession.displayNoResult)
}
-
} catch {
- case dex: DriverException => {
+ case dex: DriverException =>
logger.error(dex.getMessage, dex)
new InterpreterResult(Code.ERROR, parseException(dex))
- }
- case pex:ParsingException => {
+
+ case pex:ParsingException =>
logger.error(pex.getMessage, pex)
new InterpreterResult(Code.ERROR, pex.getMessage)
- }
- case iex: InterpreterException => {
+
+ case iex: InterpreterException =>
logger.error(iex.getMessage, iex)
new InterpreterResult(Code.ERROR, iex.getMessage)
- }
- case ex: java.lang.Exception => {
+
+ case ex: java.lang.Exception =>
logger.error(ex.getMessage, ex)
new InterpreterResult(Code.ERROR, parseException(ex))
- }
}
}
- def buildResponseMessage(lastResultSet: (ResultSet,Statement), protocolVersion: ProtocolVersion): InterpreterResult = {
+ def buildResponseMessage[StatementT <: Statement[StatementT]](lastResultSet: (ResultSet, StatementT),
+ protocolVersion: ProtocolVersion): InterpreterResult = {
val output = new StringBuilder()
val rows: collection.mutable.ArrayBuffer[Row] = ArrayBuffer()
@@ -196,10 +198,9 @@ class InterpreterLogic(val session: Session) {
val columnsDefinitions: List[(String, DataType)] = lastResultSet._1
.getColumnDefinitions
- .asList
- .toList // Java list -> Scala list
- .map(definition => (definition.getName, definition.getType))
-
+ .asScala
+ .toList
+ .map(definition => (definition.getName.asCql(true), definition.getType))
if (rows.nonEmpty) {
// Create table headers
@@ -211,15 +212,19 @@ class InterpreterLogic(val session: Session) {
rows.foreach {
row => {
val data = columnsDefinitions.map {
- case (name, dataType) => {
- if (row.isNull(name)) null else row.getObject(name)
- }
+ case (name, dataType) =>
+ if (row.isNull(name)) {
+ null
+ } else {
+ val value = row.getObject(name)
+ row.codecRegistry().codecFor(dataType, value).format(value)
+ }
}
output.append(data.mkString("\t")).append("\n")
}
}
} else {
- val lastQuery: String = lastResultSet._2.toString
+ val lastQuery: String = EnhancedSession.getCqlStatement(lastResultSet._2)
val executionInfo: ExecutionInfo = lastResultSet._1.getExecutionInfo
output.append(enhancedSession.displayExecutionStatistics(lastQuery, executionInfo))
}
@@ -232,14 +237,17 @@ class InterpreterLogic(val session: Session) {
def parseInput(input:String): List[AnyBlock] = {
val parsingResult: ParagraphParser#ParseResult[List[AnyBlock]] = paragraphParser.parseAll(paragraphParser.queries, input)
parsingResult match {
- case paragraphParser.Success(blocks,_) => blocks
- case paragraphParser.Failure(msg,next) => {
+ case paragraphParser.Success(blocks,_) =>
+ blocks
+
+ case paragraphParser.Failure(_,_) =>
throw new InterpreterException(s"Error parsing input:\n\t'$input'\nDid you forget to add ; (semi-colon) at the end of each CQL statement ?")
- }
- case paragraphParser.Error(msg,next) => {
+
+ case paragraphParser.Error(_,_) =>
throw new InterpreterException(s"Error parsing input:\n\t'$input'\nDid you forget to add ; (semi-colon) at the end of each CQL statement ?")
- }
- case _ => throw new InterpreterException(s"Error parsing input: $input")
+
+ case _ =>
+ throw new InterpreterException(s"Error parsing input: $input")
}
}
@@ -266,11 +274,6 @@ class InterpreterLogic(val session: Session) {
.flatMap(x => Option(x.value))
.headOption
- val retryPolicy: Option[RetryPolicy] = parameters
- .filter(_.paramType == RetryPolicyParam)
- .map(_.getParam[RetryPolicy])
- .headOption
-
val fetchSize: Option[Int] = parameters
.filter(_.paramType == FetchSizeParam)
.map(_.getParam[FetchSize])
@@ -283,34 +286,34 @@ class InterpreterLogic(val session: Session) {
.flatMap(x => Option(x.value))
.headOption
- CassandraQueryOptions(consistency,serialConsistency, timestamp, retryPolicy, fetchSize, requestTimeOut)
+ CassandraQueryOptions(consistency,serialConsistency, timestamp, fetchSize, requestTimeOut)
}
def generateSimpleStatement(st: SimpleStm, options: CassandraQueryOptions,context: InterpreterContext): SimpleStatement = {
logger.debug(s"Generating simple statement : '${st.text}'")
- val statement = new SimpleStatement(maybeExtractVariables(st.text, context))
+ val statement = SimpleStatement.newInstance(maybeExtractVariables(st.text, context))
applyQueryOptions(options, statement)
- statement
}
- def generateBoundStatement(session: Session, st: BoundStm, options: CassandraQueryOptions,context: InterpreterContext): BoundStatement = {
+ def generateBoundStatement(session: CqlSession, st: BoundStm, options: CassandraQueryOptions,context: InterpreterContext): BoundStatement = {
logger.debug(s"Generating bound statement with name : '${st.name}' and bound values : ${st.values}")
preparedStatements.get(st.name) match {
- case Some(ps) => {
+ case Some(ps) =>
val boundValues = maybeExtractVariables(st.values, context)
- createBoundStatement(session.getCluster.getConfiguration.getCodecRegistry, st.name, ps, boundValues)
- }
- case None => throw new InterpreterException(s"The statement '${st.name}' can not be bound to values. " +
+ createBoundStatement(session.getContext.getCodecRegistry, st.name, ps, boundValues)
+
+ case None =>
+ throw new InterpreterException(s"The statement '${st.name}' can not be bound to values. " +
s"Are you sure you did prepare it with @prepare[${st.name}] ?")
}
}
- def generateBatchStatement(batchType: BatchStatement.Type, options: CassandraQueryOptions, statements: List[Statement]): BatchStatement = {
- logger.debug(s"""Generating batch statement of type '${batchType} for ${statements.mkString(",")}'""")
- val batch = new BatchStatement(batchType)
- statements.foreach(batch.add(_))
+ def generateBatchStatement(batchType: BatchType,
+ options: CassandraQueryOptions,
+ statements: Seq[BatchableStatement[_]]): BatchStatement = {
+ logger.debug(s"""Generating batch statement of type '$batchType for ${statements.mkString(",")}'""")
+ val batch = BatchStatement.newInstance(batchType).addAll(statements:_*)
applyQueryOptions(options, batch)
- batch
}
def maybeExtractVariables(statement: String, context: InterpreterContext): String = {
@@ -324,57 +327,46 @@ class InterpreterLogic(val session: Session) {
paragraphScoped
}
- def extractVariableAndDefaultValue(statement: String, exp: String):String = {
- exp match {
- case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable, choices) => {
- val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""").replaceAll("""\|""","""\\|""")
- findInAngularRepository(variable) match {
- case Some(value) => statement.replaceAll(escapedExp,value.toString)
- case None => {
- val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList
- val paramOptions = listChoices.map(choice => new ParamOption(choice, choice))
- val selected = context.getGui.select(variable, paramOptions.toArray, listChoices.head)
- statement.replaceAll(escapedExp,selected.toString)
- }
- }
+ def extractVariableAndDefaultValue(statement: String, exp: String): String = exp match {
+ case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable, choices) =>
+ val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""").replaceAll("""\|""","""\\|""")
+ findInAngularRepository(variable) match {
+ case Some(value) => statement.replaceAll(escapedExp,value.toString)
+ case None =>
+ val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList
+ val paramOptions = listChoices.map(choice => new ParamOption(choice, choice))
+ val selected = context.getGui.select(variable, paramOptions.toArray, listChoices.head)
+ statement.replaceAll(escapedExp,selected.toString)
}
- case SIMPLE_VARIABLE_DEFINITION_PATTERN(variable,defaultVal) => {
- val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""")
- findInAngularRepository(variable) match {
- case Some(value) => statement.replaceAll(escapedExp,value.toString)
- case None => {
- val value = context.getGui.input(variable,defaultVal)
- statement.replaceAll(escapedExp,value.toString)
- }
- }
+
+ case SIMPLE_VARIABLE_DEFINITION_PATTERN(variable,defaultVal) =>
+ val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""")
+ findInAngularRepository(variable) match {
+ case Some(value) => statement.replaceAll(escapedExp,value.toString)
+ case None =>
+ val value = context.getGui.input(variable,defaultVal)
+ statement.replaceAll(escapedExp,value.toString)
}
- case _ => throw new ParsingException(s"Invalid bound variable definition for '$exp' in '$statement'. It should be of form 'variable=defaultValue' or 'variable=value1|value2|...|valueN'")
- }
+
+ case _ =>
+ throw new ParsingException(s"Invalid bound variable definition for '$exp' in '$statement'. It should be of form 'variable=defaultValue' or 'variable=value1|value2|...|valueN'")
}
- VARIABLE_PATTERN.findAllIn(statement).foldLeft(statement)(extractVariableAndDefaultValue _)
+ VARIABLE_PATTERN.findAllIn(statement).foldLeft(statement)(extractVariableAndDefaultValue)
}
- def applyQueryOptions(options: CassandraQueryOptions, statement: Statement): Unit = {
- options.consistency.foreach(statement.setConsistencyLevel(_))
- options.serialConsistency.foreach(statement.setSerialConsistencyLevel(_))
- options.timestamp.foreach(statement.setDefaultTimestamp(_))
- options.retryPolicy.foreach {
- case DefaultRetryPolicy => statement.setRetryPolicy(defaultRetryPolicy)
- case DowngradingRetryPolicy => statement.setRetryPolicy(downgradingConsistencyRetryPolicy)
- case FallThroughRetryPolicy => statement.setRetryPolicy(fallThroughRetryPolicy)
- case LoggingDefaultRetryPolicy => statement.setRetryPolicy(loggingDefaultRetryPolicy)
- case LoggingDowngradingRetryPolicy => statement.setRetryPolicy(loggingDownGradingRetryPolicy)
- case LoggingFallThroughRetryPolicy => statement.setRetryPolicy(loggingFallThroughRetryPolicy)
- case _ => throw new InterpreterException(s"""Unknown retry policy ${options.retryPolicy.getOrElse("???")}""")
- }
- options.fetchSize.foreach(statement.setFetchSize(_))
- options.requestTimeOut.foreach(statement.setReadTimeoutMillis(_))
+ def applyQueryOptions[StatementT <: Statement[StatementT]](options: CassandraQueryOptions, statement: StatementT): StatementT = {
+ val stmt1: StatementT = if (options.consistency.isDefined) statement.setConsistencyLevel(options.consistency.get) else statement
+ val stmt2: StatementT = if (options.serialConsistency.isDefined) stmt1.setSerialConsistencyLevel(options.serialConsistency.get) else stmt1
+ val stmt3: StatementT = if (options.timestamp.isDefined) stmt2.setQueryTimestamp(options.timestamp.get) else stmt2
+ val stmt4: StatementT = if (options.fetchSize.isDefined) stmt3.setPageSize(options.fetchSize.get) else stmt3
+ val stmt5: StatementT = if (options.requestTimeOut.isDefined) stmt4.setTimeout(Duration.ofSeconds(options.requestTimeOut.get)) else stmt4
+ stmt5
}
- private def createBoundStatement(codecRegistry: CodecRegistry, name: String, ps: PreparedStatement, rawBoundValues: String): BoundStatement = {
- val dataTypes = ps.getVariables.toList
- .map(cfDef => cfDef.getType)
+ private def createBoundStatement(codecRegistry: CodecRegistry, name: String, ps: PreparedStatement,
+ rawBoundValues: String): BoundStatement = {
+ val dataTypes = ps.getVariableDefinitions.iterator.asScala.toSeq.map(cfDef => cfDef.getType)
val boundValuesAsText = parseBoundValues(name,rawBoundValues)
@@ -383,15 +375,15 @@ class InterpreterLogic(val session: Session) {
val convertedValues: List[AnyRef] = boundValuesAsText
.zip(dataTypes).map {
- case (value, dataType) => {
+ case (value, dataType) =>
if(value.trim == "null") {
null
} else {
val codec: TypeCodec[AnyRef] = codecRegistry.codecFor[AnyRef](dataType)
- dataType.getName match {
- case (ASCII | TEXT | VARCHAR) => value.trim.replaceAll("(?<!')'","")
- case (INT | VARINT) => value.trim.toInt
- case (BIGINT | COUNTER) => value.trim.toLong
+ dataType match {
+ case ASCII | TEXT => value.trim.replaceAll("(?<!')'","")
+ case INT | VARINT => value.trim.toInt
+ case BIGINT | COUNTER => value.trim.toLong
case BLOB => ByteBuffer.wrap(value.trim.getBytes)
case BOOLEAN => value.trim.toBoolean
case DECIMAL => BigDecimal(value.trim)
@@ -399,16 +391,15 @@ class InterpreterLogic(val session: Session) {
case FLOAT => value.trim.toFloat
case INET => InetAddress.getByName(value.trim)
case TIMESTAMP => parseDate(value.trim)
- case (UUID | TIMEUUID) => java.util.UUID.fromString(value.trim)
- case LIST => codec.parse(boundValuesParser.parse(boundValuesParser.list, value).get)
- case SET => codec.parse(boundValuesParser.parse(boundValuesParser.set, value).get)
- case MAP => codec.parse(boundValuesParser.parse(boundValuesParser.map, value).get)
- case UDT => codec.parse(boundValuesParser.parse(boundValuesParser.udt, value).get)
- case TUPLE => codec.parse(boundValuesParser.parse(boundValuesParser.tuple, value).get)
+ case UUID | TIMEUUID => java.util.UUID.fromString(value.trim)
+ case _: ListType => codec.parse(boundValuesParser.parse(boundValuesParser.list, value).get)
+ case _: SetType => codec.parse(boundValuesParser.parse(boundValuesParser.set, value).get)
+ case _: MapType => codec.parse(boundValuesParser.parse(boundValuesParser.map, value).get)
+ case _: UserDefinedType => codec.parse(boundValuesParser.parse(boundValuesParser.udt, value).get)
+ case _: TupleType => codec.parse(boundValuesParser.parse(boundValuesParser.tuple, value).get)
case _ => throw new InterpreterException(s"Cannot parse data of type : ${dataType.toString}")
}
}
- }
}.asInstanceOf[List[AnyRef]]
ps.bind(convertedValues.toArray: _*)
@@ -423,13 +414,15 @@ class InterpreterLogic(val session: Session) {
}
}
- def parseDate(dateString: String): Date = {
- dateString match {
- case boundValuesParser.STANDARD_DATE_PATTERN(datePattern) => new SimpleDateFormat(STANDARD_DATE_FORMAT).parse(datePattern)
- case boundValuesParser.ACCURATE_DATE_PATTERN(datePattern) => new SimpleDateFormat(ACCURATE_DATE_FORMAT).parse(datePattern)
+ def parseDate(dateString: String): Instant = {
+ val formatter = dateString match {
+ case boundValuesParser.STANDARD_DATE_PATTERN(_) => STANDARD_DATE_FORMATTER
+ case boundValuesParser.ACCURATE_DATE_PATTERN(_) => ACCURATE_DATE_FORMATTER
case _ => throw new InterpreterException(s"Cannot parse date '$dateString'. " +
s"Accepted formats : $STANDARD_DATE_FORMAT OR $ACCURATE_DATE_FORMAT");
}
+ // TODO(alex): check about timezone...
+ LocalDateTime.parse(dateString, formatter).toInstant(ZoneOffset.UTC)
}
def parseException(ex: Exception): String = {
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
index 865d89f..1834e7e 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
@@ -18,296 +18,130 @@ package org.apache.zeppelin.cassandra
import java.lang.Boolean._
-import com.datastax.driver.core.HostDistance._
-import com.datastax.driver.core.ProtocolOptions.Compression
-import com.datastax.driver.core._
-import com.datastax.driver.core.policies._
+import com.datastax.oss.driver.api.core.ProtocolVersion
+import com.datastax.oss.driver.api.core.config.{DefaultDriverOption, ProgrammaticDriverConfigLoaderBuilder}
import org.apache.commons.lang3.StringUtils._
import org.apache.zeppelin.interpreter.Interpreter
import org.apache.zeppelin.cassandra.CassandraInterpreter._
-import org.slf4j.LoggerFactory
+import org.slf4j.{Logger, LoggerFactory}
/**
* Utility class to extract and configure the Java driver
*/
class JavaDriverConfig {
+ val LOGGER: Logger = LoggerFactory.getLogger(classOf[JavaDriverConfig])
- val LOGGER = LoggerFactory.getLogger(classOf[JavaDriverConfig])
+ def setSocketOptions(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
+ val connectTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS,
+ CassandraInterpreter.DEFAULT_CONNECTION_TIMEOUT).toInt
+ configBuilder.withInt(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, connectTimeoutMillis)
+ configBuilder.withInt(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, connectTimeoutMillis)
- def getSocketOptions(intpr: Interpreter): SocketOptions = {
- val socketOptions: SocketOptions = new SocketOptions
- val socketOptionsInfo: StringBuilder = new StringBuilder("Socket options : \n\n")
+ val readTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS,
+ CassandraInterpreter.DEFAULT_READ_TIMEOUT).toInt
+ configBuilder.withInt(DefaultDriverOption.REQUEST_TIMEOUT, readTimeoutMillis)
- val connectTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS).toInt
- socketOptions.setConnectTimeoutMillis(connectTimeoutMillis)
- socketOptionsInfo
- .append("\t")
- .append(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS)
- .append(" : ")
- .append(connectTimeoutMillis).append("\n")
-
- val readTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS).toInt
- socketOptions.setReadTimeoutMillis(readTimeoutMillis)
- socketOptionsInfo
- .append("\t")
- .append(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS)
- .append(" : ")
- .append(readTimeoutMillis).append("\n")
-
- val tcpNoDelay: Boolean = parseBoolean(intpr.getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY))
- socketOptions.setTcpNoDelay(tcpNoDelay)
- socketOptionsInfo
- .append("\t")
- .append(CASSANDRA_SOCKET_TCP_NO_DELAY)
- .append(" : ")
- .append(tcpNoDelay)
- .append("\n")
+ val tcpNoDelay = intpr.getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, CassandraInterpreter.DEFAULT_TCP_NO_DELAY)
+ if (isNotBlank(tcpNoDelay)) {
+ configBuilder.withBoolean(DefaultDriverOption.SOCKET_TCP_NODELAY, parseBoolean(tcpNoDelay))
+ }
val keepAlive: String = intpr.getProperty(CASSANDRA_SOCKET_KEEP_ALIVE)
if (isNotBlank(keepAlive)) {
- val keepAliveValue: Boolean = parseBoolean(keepAlive)
- socketOptions.setKeepAlive(keepAliveValue)
- socketOptionsInfo
- .append("\t")
- .append(CASSANDRA_SOCKET_KEEP_ALIVE)
- .append(" : ")
- .append(keepAliveValue).append("\n")
+ configBuilder.withBoolean(DefaultDriverOption.SOCKET_KEEP_ALIVE, parseBoolean(keepAlive))
}
val receivedBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES)
if (isNotBlank(receivedBuffSize)) {
- val receiveBufferSizeValue: Int = receivedBuffSize.toInt
- socketOptions.setReceiveBufferSize(receiveBufferSizeValue)
- socketOptionsInfo
- .append("\t")
- .append(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES)
- .append(" : ")
- .append(receiveBufferSizeValue)
- .append("\n")
+ configBuilder.withInt(DefaultDriverOption.SOCKET_RECEIVE_BUFFER_SIZE, receivedBuffSize.toInt)
}
val sendBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES)
if (isNotBlank(sendBuffSize)) {
- val sendBufferSizeValue: Int = sendBuffSize.toInt
- socketOptions.setSendBufferSize(sendBufferSizeValue)
- socketOptionsInfo
- .append("\t")
- .append(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES)
- .append(" : ")
- .append(sendBufferSizeValue)
- .append("\n")
+ configBuilder.withInt(DefaultDriverOption.SOCKET_SEND_BUFFER_SIZE, sendBuffSize.toInt)
}
val reuseAddress: String = intpr.getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS)
if (isNotBlank(reuseAddress)) {
- val reuseAddressValue: Boolean = parseBoolean(reuseAddress)
- socketOptions.setReuseAddress(reuseAddressValue)
- socketOptionsInfo
- .append("\t")
- .append(CASSANDRA_SOCKET_REUSE_ADDRESS)
- .append(" : ")
- .append(reuseAddressValue)
- .append("\n")
+ configBuilder.withBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, parseBoolean(reuseAddress))
}
val soLinger: String = intpr.getProperty(CASSANDRA_SOCKET_SO_LINGER)
if (isNotBlank(soLinger)) {
- val soLingerValue: Int = soLinger.toInt
- socketOptions.setSoLinger(soLingerValue)
- socketOptionsInfo
- .append("\t")
- .append(CASSANDRA_SOCKET_SO_LINGER)
- .append(" : ")
- .append(soLingerValue)
- .append("\n")
+ configBuilder.withInt(DefaultDriverOption.SOCKET_LINGER_INTERVAL, soLinger.toInt)
}
-
- LOGGER.debug(socketOptionsInfo.append("\n").toString)
-
- return socketOptions
}
- def getQueryOptions(intpr: Interpreter): QueryOptions = {
- val queryOptions: QueryOptions = new QueryOptions
- val queryOptionsInfo: StringBuilder = new StringBuilder("Query options : \n\n")
+ def setQueryOptions(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
+ val consistencyLevel = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY,
+ CassandraInterpreter.DEFAULT_CONSISTENCY)
+ configBuilder.withString(DefaultDriverOption.REQUEST_CONSISTENCY, consistencyLevel)
- val consistencyLevel: ConsistencyLevel = ConsistencyLevel.valueOf(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY))
- queryOptions.setConsistencyLevel(consistencyLevel)
- queryOptionsInfo
- .append("\t")
- .append(CASSANDRA_QUERY_DEFAULT_CONSISTENCY)
- .append(" : ")
- .append(consistencyLevel)
- .append("\n")
+ val serialConsistencyLevel = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY,
+ CassandraInterpreter.DEFAULT_SERIAL_CONSISTENCY)
+ configBuilder.withString(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY, serialConsistencyLevel)
- val serialConsistencyLevel: ConsistencyLevel = ConsistencyLevel.valueOf(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY))
- queryOptions.setSerialConsistencyLevel(serialConsistencyLevel)
- queryOptionsInfo
- .append("\t")
- .append(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY)
- .append(" : ")
- .append(serialConsistencyLevel)
- .append("\n")
+ val fetchSize = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE,
+ CassandraInterpreter.DEFAULT_FETCH_SIZE).toInt
+ configBuilder.withInt(DefaultDriverOption.REQUEST_PAGE_SIZE, fetchSize)
- val fetchSize: Int = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE).toInt
- queryOptions.setFetchSize(fetchSize)
- queryOptionsInfo
- .append("\t")
- .append(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE)
- .append(" : ")
- .append(fetchSize)
- .append("\n")
-
- val defaultIdempotence: Boolean = parseBoolean(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE))
- queryOptions.setDefaultIdempotence(defaultIdempotence)
- queryOptionsInfo
- .append("\t")
- .append(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE)
- .append(" : ")
- .append(defaultIdempotence)
- .append("\n")
-
- LOGGER.debug(queryOptionsInfo.append("\n").toString)
-
- return queryOptions
+ configBuilder.withBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE,
+ parseBoolean(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE)))
}
- def getProtocolVersion(intpr: Interpreter): ProtocolVersion = {
- val protocolVersion: String = intpr.getProperty(CASSANDRA_PROTOCOL_VERSION)
+ val PROTOCOL_MAPPING: Map[String, ProtocolVersion] = Map("3" -> ProtocolVersion.V3, "4" -> ProtocolVersion.V4,
+ "5" -> ProtocolVersion.V5, "DSE1" -> ProtocolVersion.DSE_V1, "DSE2" -> ProtocolVersion.DSE_V2)
+
+ def setProtocolVersion(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
+ val protocolVersion: String = intpr.getProperty(CASSANDRA_PROTOCOL_VERSION,
+ CassandraInterpreter.DEFAULT_PROTOCOL_VERSION)
LOGGER.debug("Protocol version : " + protocolVersion)
protocolVersion match {
- case "1" =>
- defaultMaxConnectionPerHostLocal = "8"
- defaultMaxConnectionPerHostRemote = "2"
- defaultCoreConnectionPerHostLocal = "2"
- defaultCoreConnectionPerHostRemote = "1"
- defaultNewConnectionThresholdLocal = "100"
- defaultNewConnectionThresholdRemote = "1"
- defaultMaxRequestPerConnectionLocal = "128"
- defaultMaxRequestPerConnectionRemote = "128"
- return ProtocolVersion.V1
- case "2" =>
- defaultMaxConnectionPerHostLocal = "8"
- defaultMaxConnectionPerHostRemote = "2"
- defaultCoreConnectionPerHostLocal = "2"
- defaultCoreConnectionPerHostRemote = "1"
- defaultNewConnectionThresholdLocal = "100"
- defaultNewConnectionThresholdRemote = "1"
- defaultMaxRequestPerConnectionLocal = "128"
- defaultMaxRequestPerConnectionRemote = "128"
- return ProtocolVersion.V2
- case "3" =>
- defaultMaxConnectionPerHostLocal = "1"
- defaultMaxConnectionPerHostRemote = "1"
- defaultCoreConnectionPerHostLocal = "1"
- defaultCoreConnectionPerHostRemote = "1"
- defaultNewConnectionThresholdLocal = "800"
- defaultNewConnectionThresholdRemote = "200"
- defaultMaxRequestPerConnectionLocal = "1024"
- defaultMaxRequestPerConnectionRemote = "256"
- return ProtocolVersion.V3
- case "4" =>
- defaultMaxConnectionPerHostLocal = "1"
- defaultMaxConnectionPerHostRemote = "1"
- defaultCoreConnectionPerHostLocal = "1"
- defaultCoreConnectionPerHostRemote = "1"
- defaultNewConnectionThresholdLocal = "800"
- defaultNewConnectionThresholdRemote = "200"
- defaultMaxRequestPerConnectionLocal = "1024"
- defaultMaxRequestPerConnectionRemote = "256"
- return ProtocolVersion.V4
+ case "1" | "2" =>
+ throw new RuntimeException(s"Protocol V${protocolVersion} isn't supported")
case _ =>
- defaultMaxConnectionPerHostLocal = "1"
- defaultMaxConnectionPerHostRemote = "1"
- defaultCoreConnectionPerHostLocal = "1"
- defaultCoreConnectionPerHostRemote = "1"
- defaultNewConnectionThresholdLocal = "800"
- defaultNewConnectionThresholdRemote = "200"
- defaultMaxRequestPerConnectionLocal = "1024"
- defaultMaxRequestPerConnectionRemote = "256"
- return ProtocolVersion.NEWEST_SUPPORTED
+ configBuilder.withString(DefaultDriverOption.PROTOCOL_VERSION,
+ PROTOCOL_MAPPING.getOrElse(protocolVersion, ProtocolVersion.DEFAULT).name())
}
}
- def getPoolingOptions(intpr: Interpreter): PoolingOptions = {
- val poolingOptions: PoolingOptions = new PoolingOptions
+ def setPoolingOptions(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
val poolingOptionsInfo: StringBuilder = new StringBuilder("Pooling options : \n\n")
- val maxConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL).toInt
- poolingOptions.setMaxConnectionsPerHost(LOCAL, maxConnPerHostLocal)
- poolingOptionsInfo
- .append("\t")
- .append(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL)
- .append(" : ")
- .append(maxConnPerHostLocal)
- .append("\n")
-
- val maxConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE).toInt
- poolingOptions.setMaxConnectionsPerHost(REMOTE, maxConnPerHostRemote)
- poolingOptionsInfo
- .append("\t")
- .append(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE)
- .append(" : ")
- .append(maxConnPerHostRemote)
- .append("\n")
-
- val coreConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL).toInt
- poolingOptions.setCoreConnectionsPerHost(LOCAL, coreConnPerHostLocal)
+ val coreConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL,
+ DEFAULT_CONNECTIONS_PER_HOST).toInt
+ configBuilder.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, coreConnPerHostLocal)
poolingOptionsInfo
.append("\t")
- .append(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL)
+ .append(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL)
.append(" : ")
.append(coreConnPerHostLocal)
.append("\n")
- val coreConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE).toInt
- poolingOptions.setCoreConnectionsPerHost(REMOTE, coreConnPerHostRemote)
+ val coreConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE,
+ DEFAULT_CONNECTIONS_PER_HOST).toInt
+ configBuilder.withInt(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, coreConnPerHostRemote)
poolingOptionsInfo
.append("\t")
- .append(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE)
+ .append(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE)
.append(" : ")
.append(coreConnPerHostRemote)
.append("\n")
- val newConnThresholdLocal: Int = intpr.getProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL).toInt
- poolingOptions.setNewConnectionThreshold(LOCAL, newConnThresholdLocal)
- poolingOptionsInfo
- .append("\t")
- .append(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL)
- .append(" : ")
- .append(newConnThresholdLocal)
- .append("\n")
-
- val newConnThresholdRemote: Int = intpr.getProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE).toInt
- poolingOptions.setNewConnectionThreshold(REMOTE, newConnThresholdRemote)
- poolingOptionsInfo
- .append("\t")
- .append(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE)
- .append(" : ")
- .append(newConnThresholdRemote)
- .append("\n")
-
- val maxReqPerConnLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL).toInt
- poolingOptions.setMaxRequestsPerConnection(LOCAL, maxReqPerConnLocal)
+ val maxReqPerConnLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION,
+ DEFAULT_MAX_REQUEST_PER_CONNECTION).toInt
+ configBuilder.withInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS, maxReqPerConnLocal)
poolingOptionsInfo
.append("\t")
- .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL)
+ .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION)
.append(" : ")
.append(maxReqPerConnLocal)
.append("\n")
- val maxReqPerConnRemote: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE).toInt
- poolingOptions.setMaxRequestsPerConnection(REMOTE, maxReqPerConnRemote)
- poolingOptionsInfo
- .append("\t")
- .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE)
- .append(" : ")
- .append(maxReqPerConnRemote)
- .append("\n")
-
- val heartbeatIntervalSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS).toInt
- poolingOptions.setHeartbeatIntervalSeconds(heartbeatIntervalSeconds)
+ val heartbeatIntervalSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS,
+ DEFAULT_HEARTBEAT_INTERVAL).toInt
+ configBuilder.withInt(DefaultDriverOption.HEARTBEAT_INTERVAL, heartbeatIntervalSeconds)
poolingOptionsInfo
.append("\t")
.append(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS)
@@ -315,126 +149,59 @@ class JavaDriverConfig {
.append(heartbeatIntervalSeconds)
.append("\n")
- val idleTimeoutSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS).toInt
- poolingOptions.setIdleTimeoutSeconds(idleTimeoutSeconds)
- poolingOptionsInfo
- .append("\t")
- .append(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS)
- .append(" : ")
- .append(idleTimeoutSeconds)
- .append("\n")
-
- val poolTimeoutMillis: Int = intpr.getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS).toInt
- poolingOptions.setPoolTimeoutMillis(poolTimeoutMillis)
+ val idleTimeoutSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS,
+ DEFAULT_POOL_TIMEOUT).toInt
+ configBuilder.withInt(DefaultDriverOption.HEARTBEAT_TIMEOUT, idleTimeoutSeconds)
poolingOptionsInfo
.append("\t")
.append(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS)
.append(" : ")
- .append(poolTimeoutMillis)
+ .append(idleTimeoutSeconds)
.append("\n")
LOGGER.debug(poolingOptionsInfo.append("\n").toString)
-
- return poolingOptions
}
- def getCompressionProtocol(intpr: Interpreter): ProtocolOptions.Compression = {
- var compression: ProtocolOptions.Compression = null
- val compressionProtocol: String = intpr.getProperty(CASSANDRA_COMPRESSION_PROTOCOL)
-
+ def setCompressionProtocol(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
+ val compressionProtocol = intpr.getProperty(CASSANDRA_COMPRESSION_PROTOCOL,
+ CassandraInterpreter.DEFAULT_COMPRESSION).toLowerCase
LOGGER.debug("Compression protocol : " + compressionProtocol)
- if (compressionProtocol == null) "NONE"
- else compressionProtocol.toUpperCase match {
- case "NONE" =>
- compression = Compression.NONE
- case "SNAPPY" =>
- compression = Compression.SNAPPY
- case "LZ4" =>
- compression = Compression.LZ4
- case _ =>
- compression = Compression.NONE
+ compressionProtocol match {
+ case "snappy" | "lz4" =>
+ configBuilder.withString(DefaultDriverOption.PROTOCOL_COMPRESSION, compressionProtocol)
+ case _ => ()
}
- return compression
}
- def getLoadBalancingPolicy(intpr: Interpreter): LoadBalancingPolicy = {
- val loadBalancingPolicy: String = intpr.getProperty(CASSANDRA_LOAD_BALANCING_POLICY)
- LOGGER.debug("Load Balancing Policy : " + loadBalancingPolicy)
-
- if (isBlank(loadBalancingPolicy) || (DEFAULT_POLICY == loadBalancingPolicy)) {
- return Policies.defaultLoadBalancingPolicy
- }
- else {
- try {
- return (Class.forName(loadBalancingPolicy).asInstanceOf[Class[LoadBalancingPolicy]]).newInstance
- }
- catch {
- case e: Any => {
- e.printStackTrace
- throw new RuntimeException("Cannot instantiate " + CASSANDRA_LOAD_BALANCING_POLICY + " = " + loadBalancingPolicy)
- }
- }
- }
+ private def isNotDefaultParameter(param: String) = {
+ !(isBlank(param) || DEFAULT_POLICY == param)
}
- def getRetryPolicy(intpr: Interpreter): RetryPolicy = {
+ def setRetryPolicy(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
val retryPolicy: String = intpr.getProperty(CASSANDRA_RETRY_POLICY)
LOGGER.debug("Retry Policy : " + retryPolicy)
- if (isBlank(retryPolicy) || (DEFAULT_POLICY == retryPolicy)) {
- return Policies.defaultRetryPolicy
- }
- else {
- try {
- return (Class.forName(retryPolicy).asInstanceOf[Class[RetryPolicy]]).newInstance
- }
- catch {
- case e: Any => {
- e.printStackTrace
- throw new RuntimeException("Cannot instantiate " + CASSANDRA_RETRY_POLICY + " = " + retryPolicy)
- }
- }
+ if (isNotDefaultParameter(retryPolicy)) {
+ configBuilder.withString(DefaultDriverOption.RETRY_POLICY_CLASS, retryPolicy)
}
}
- def getReconnectionPolicy(intpr: Interpreter): ReconnectionPolicy = {
+ def setReconnectionPolicy(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
val reconnectionPolicy: String = intpr.getProperty(CASSANDRA_RECONNECTION_POLICY)
LOGGER.debug("Reconnection Policy : " + reconnectionPolicy)
- if (isBlank(reconnectionPolicy) || (DEFAULT_POLICY == reconnectionPolicy)) {
- return Policies.defaultReconnectionPolicy
- }
- else {
- try {
- return (Class.forName(reconnectionPolicy).asInstanceOf[Class[ReconnectionPolicy]]).newInstance
- }
- catch {
- case e: Any => {
- e.printStackTrace
- throw new RuntimeException("Cannot instantiate " + CASSANDRA_RECONNECTION_POLICY + " = " + reconnectionPolicy)
- }
- }
+ if (isNotDefaultParameter(reconnectionPolicy)) {
+ configBuilder.withString(DefaultDriverOption.RECONNECTION_POLICY_CLASS, reconnectionPolicy)
}
}
- def getSpeculativeExecutionPolicy(intpr: Interpreter): SpeculativeExecutionPolicy = {
+ def setSpeculativeExecutionPolicy(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
val specExecPolicy: String = intpr.getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY)
LOGGER.debug("Speculative Execution Policy : " + specExecPolicy)
- if (isBlank(specExecPolicy) || (DEFAULT_POLICY == specExecPolicy)) {
- return Policies.defaultSpeculativeExecutionPolicy
+ if (isNotDefaultParameter(specExecPolicy)) {
+ configBuilder.withString(DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS, specExecPolicy)
}
- else {
- try {
- return (Class.forName(specExecPolicy).asInstanceOf[Class[SpeculativeExecutionPolicy]]).newInstance
- }
- catch {
- case e: Any => {
- e.printStackTrace
- throw new RuntimeException("Cannot instantiate " + CASSANDRA_SPECULATIVE_EXECUTION_POLICY + " = " + specExecPolicy)
- }
- }
- }
- }
+ }
}
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
index 4b88776..1f6a814 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
@@ -18,8 +18,8 @@ package org.apache.zeppelin.cassandra
import java.util.UUID
-import com.datastax.driver.core.utils.UUIDs
-import com.datastax.driver.core.{DataType, TableMetadata}
+import com.datastax.oss.driver.api.core.`type`.DataType
+import com.datastax.oss.driver.api.core.uuid.Uuids
import scala.util.parsing.json.JSONObject
@@ -28,15 +28,14 @@ import scala.util.parsing.json.JSONObject
*/
object MetaDataHierarchy {
object OrderConverter {
- def convert(clusteringOrder: com.datastax.driver.core.ClusteringOrder): ClusteringOrder = {
+ def convert(clusteringOrder: com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder): ClusteringOrder = {
clusteringOrder match {
- case com.datastax.driver.core.ClusteringOrder.ASC => ASC
- case com.datastax.driver.core.ClusteringOrder.DESC => DESC
+ case com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder.ASC => ASC
+ case com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder.DESC => DESC
}
}
}
-
sealed trait ClusteringOrder
object ASC extends ClusteringOrder
object DESC extends ClusteringOrder
@@ -50,33 +49,36 @@ object MetaDataHierarchy {
case class ColumnDetails(name: String, columnType: ColumnType, dataType: DataType)
case class ClusterDetails(name: String, partitioner: String)
- case class ClusterContent(clusterName: String, clusterDetails: String, keyspaces: List[(UUID, String, String)])
- case class AllTables(tables: Map[String,List[String]])
- case class KeyspaceDetails(name: String, replication: Map[String,String], durableWrites: Boolean, asCQL: String, uniqueId: UUID = UUIDs.timeBased()) {
+ case class ClusterContent(clusterName: String, clusterDetails: String, keyspaces: Seq[(UUID, String, String)])
+ case class AllTables(tables: Map[String, Seq[String]])
+ case class KeyspaceDetails(name: String, replication: Map[String,String], durableWrites: Boolean,
+ asCQL: String, uniqueId: UUID = Uuids.timeBased()) {
def getReplicationMap: String = {
JSONObject(replication).toString().replaceAll(""""""","'")
}
}
case class KeyspaceContent(keyspaceName: String, keyspaceDetails: String,
- tables: List[(UUID,String, String)],
- views: List[(UUID,String, String)],
- udts: List[(UUID, String, String)],
- functions: List[(UUID, String, String)],
- aggregates: List[(UUID, String, String)])
- case class TableDetails(tableName: String, columns: List[ColumnDetails], indices: List[IndexDetails], asCQL: String, indicesAsCQL: String, uniqueId: UUID = UUIDs.timeBased())
- case class UDTDetails(typeName: String, columns: List[ColumnDetails], asCQL: String, uniqueId: UUID = UUIDs.timeBased())
+ tables: Seq[(UUID,String, String)],
+ views: Seq[(UUID,String, String)],
+ udts: Seq[(UUID, String, String)],
+ functions: Seq[(UUID, String, String)],
+ aggregates: Seq[(UUID, String, String)])
+ case class TableDetails(tableName: String, columns: Seq[ColumnDetails], indices: Seq[IndexDetails], asCQL: String,
+ indicesAsCQL: String, uniqueId: UUID = Uuids.timeBased())
+ case class UDTDetails(typeName: String, columns: Seq[ColumnDetails], asCQL: String, uniqueId: UUID = Uuids.timeBased())
- case class SameNameFunctionDetails(functions: List[FunctionDetails])
- case class FunctionDetails(keyspace:String, name: String, arguments: List[String], calledOnNullInput: Boolean, returnType: String,
- language:String, body: String, asCQL: String, uniqueId: UUID = UUIDs.timeBased())
- case class FunctionSummary(keyspace:String, name: String, arguments: List[String], returnType: String)
+ case class SameNameFunctionDetails(functions: Seq[FunctionDetails])
+ case class FunctionDetails(keyspace:String, name: String, arguments: Seq[String], calledOnNullInput: Boolean, returnType: String,
+ language:String, body: String, asCQL: String, uniqueId: UUID = Uuids.timeBased())
+ case class FunctionSummary(keyspace:String, name: String, arguments: Seq[String], returnType: String)
- case class AggregateDetails(keyspace:String, name: String, arguments: List[String], sFunc: String, sType: String,
+ case class AggregateDetails(keyspace:String, name: String, arguments: Seq[String], sFunc: String, sType: String,
finalFunc: Option[String], initCond: Option[String], returnType: String,
- asCQL: String, uniqueId: UUID = UUIDs.timeBased())
- case class AggregateSummary(keyspace:String, name: String, arguments: List[String], returnType: String)
- case class SameNameAggregateDetails(aggregates: List[AggregateDetails])
+ asCQL: String, uniqueId: UUID = Uuids.timeBased())
+ case class AggregateSummary(keyspace:String, name: String, arguments: Seq[String], returnType: String)
+ case class SameNameAggregateDetails(aggregates: Seq[AggregateDetails])
- case class MaterializedViewDetails(name: String, columns: List[ColumnDetails], asCQL: String, baseTable: String, uniqueId: UUID = UUIDs.timeBased())
+ case class MaterializedViewDetails(name: String, columns: Seq[ColumnDetails], asCQL: String, baseTable: String,
+ uniqueId: UUID = Uuids.timeBased())
case class MaterializedViewSummary(name: String, baseTable: String)
}
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
index 2c198ca..514161e 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
@@ -16,9 +16,11 @@
*/
package org.apache.zeppelin.cassandra
-import com.datastax.driver.core._
+import com.datastax.oss.driver.api.core.{ConsistencyLevel, DefaultConsistencyLevel}
+import com.datastax.oss.driver.api.core.cql.{BatchType, DefaultBatchType}
import org.apache.zeppelin.cassandra.CassandraInterpreter._
import org.apache.zeppelin.interpreter.InterpreterException
+
import scala.util.matching.Regex
import scala.util.parsing.combinator._
import org.apache.zeppelin.cassandra.TextBlockHierarchy._
@@ -33,25 +35,24 @@ import org.apache.zeppelin.cassandra.TextBlockHierarchy._
*/
object ParagraphParser {
- val CONSISTENCY_LEVEL_PATTERN = ConsistencyLevel.values().toList
- .map(_.name()).filter(!_.contains("SERIAL")).mkString("""^\s*@consistency\s*=\s*(""", "|" , """)\s*$""").r
+ val allConsistencyLevels: Seq[ConsistencyLevel] = DefaultConsistencyLevel.values().toSeq
+
+ val CONSISTENCY_LEVEL_PATTERN: Regex = allConsistencyLevels.filter(!_.isSerial).map(_.name)
+ .mkString("""^\s*@consistency\s*=\s*(""", "|" , """)\s*$""").r
- val SERIAL_CONSISTENCY_LEVEL_PATTERN = ConsistencyLevel.values().toList
- .map(_.name()).filter(_.contains("SERIAL")).mkString("""^\s*@serialConsistency\s*=\s*(""", "|", """)\s*$""").r
- val TIMESTAMP_PATTERN = """^\s*@timestamp\s*=\s*([0-9]+)\s*$""".r
+ val SERIAL_CONSISTENCY_LEVEL_PATTERN: Regex = allConsistencyLevels.filter(_.isSerial).map(_.name)
+ .mkString("""^\s*@serialConsistency\s*=\s*(""", "|", """)\s*$""").r
+ val TIMESTAMP_PATTERN: Regex = """^\s*@timestamp\s*=\s*([0-9]+)\s*$""".r
- val RETRY_POLICIES_PATTERN = List(DEFAULT_POLICY,DOWNGRADING_CONSISTENCY_RETRY, FALLTHROUGH_RETRY,
- LOGGING_DEFAULT_RETRY, LOGGING_DOWNGRADING_RETRY, LOGGING_FALLTHROUGH_RETRY)
- .mkString("""^\s*@retryPolicy\s*=\s*(""", "|" , """)\s*$""").r
- val FETCHSIZE_PATTERN = """^\s*@fetchSize\s*=\s*([0-9]+)\s*$""".r
- val REQUEST_TIMEOUT_PATTERN = """^\s*@requestTimeOut\s*=\s*([0-9]+)\s*$""".r
+ val FETCHSIZE_PATTERN: Regex = """^\s*@fetchSize\s*=\s*([0-9]+)\s*$""".r
+ val REQUEST_TIMEOUT_PATTERN: Regex = """^\s*@requestTimeOut\s*=\s*([0-9]+)\s*$""".r
- val SIMPLE_STATEMENT_PATTERN = """([^;]+;)""".r
- val PREPARE_STATEMENT_PATTERN = """^\s*@prepare\[([^]]+)\]\s*=\s*([^;]+)$""".r
- val REMOVE_PREPARE_STATEMENT_PATTERN = """^\s*@remove_prepare\[([^]]+)\]\s*$""".r
+ val SIMPLE_STATEMENT_PATTERN: Regex = """([^;]+;)""".r
+ val PREPARE_STATEMENT_PATTERN: Regex = """^\s*@prepare\[([^]]+)\]\s*=\s*([^;]+)$""".r
+ val REMOVE_PREPARE_STATEMENT_PATTERN: Regex = """^\s*@remove_prepare\[([^]]+)\]\s*$""".r
- val BIND_PATTERN = """^\s*@bind\[([^]]+)\](?:=([^;]+))?""".r
- val BATCH_PATTERN = """^(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r
+ val BIND_PATTERN: Regex = """^\s*@bind\[([^]]+)\](?:=([^;]+))?""".r
+ val BATCH_PATTERN: Regex = """^(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r
/**
* Very complicated RegExp
@@ -67,131 +68,129 @@ object ParagraphParser {
* followed by optional white-space(s) (\s*)
* followed by semi-colon (;)
*/
- val UDF_PATTERN = """(?is)\s*(CREATE(?:\s+OR REPLACE)?\s+FUNCTION(?:\s+IF\s+NOT\s+EXISTS)?.+?(?:\s+|\n|\r|\f)AS(?:\s+|\n|\r|\f)(?:'|\$\$).+?(?:'|\$\$)\s*;)""".r
+ val UDF_PATTERN: Regex = """(?is)\s*(CREATE(?:\s+OR REPLACE)?\s+FUNCTION(?:\s+IF\s+NOT\s+EXISTS)?.+?(?:\s+|\n|\r|\f)AS(?:\s+|\n|\r|\f)(?:'|\$\$).+?(?:'|\$\$)\s*;)""".r
- val GENERIC_STATEMENT_PREFIX =
+ val GENERIC_STATEMENT_PREFIX: Regex =
"""(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|ALTER|
|DROP|GRANT|REVOKE|TRUNCATE|LIST|USE)\s+""".r
val VALID_IDENTIFIER = "[a-z][a-z0-9_]*"
- val DESCRIBE_CLUSTER_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER\s*;\s*$""".r
+ val DESCRIBE_CLUSTER_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER\s*;\s*$""".r
- val DESCRIBE_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
- val DESCRIBE_KEYSPACES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES\s*;\s*$""".r
+ val DESCRIBE_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+ val DESCRIBE_KEYSPACES_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES\s*;\s*$""".r
- val DESCRIBE_TABLE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
- val DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*(""" +
+ val DESCRIBE_TABLE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+ val DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*(""" +
VALID_IDENTIFIER +
""")\.(""" +
VALID_IDENTIFIER +
""");\s*$""").r
- val DESCRIBE_TABLES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+TABLES\s*;\s*$""".r
+ val DESCRIBE_TABLES_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+TABLES\s*;\s*$""".r
- val DESCRIBE_TYPE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
- val DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*(""" +
+ val DESCRIBE_TYPE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+ val DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*(""" +
VALID_IDENTIFIER +
""")\.(""" +
VALID_IDENTIFIER +
""");\s*$""").r
- val DESCRIBE_TYPES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+TYPES\s*;\s*$""".r
+ val DESCRIBE_TYPES_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+TYPES\s*;\s*$""".r
- val DESCRIBE_FUNCTION_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s*("""+VALID_IDENTIFIER+""");\s*$""").r
- val DESCRIBE_FUNCTION_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s*(""" +
+ val DESCRIBE_FUNCTION_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+ val DESCRIBE_FUNCTION_WITH_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s*(""" +
VALID_IDENTIFIER +
""")\.(""" +
VALID_IDENTIFIER +
""");\s*$""").r
- val DESCRIBE_FUNCTIONS_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTIONS\s*;\s*$""").r
+ val DESCRIBE_FUNCTIONS_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTIONS\s*;\s*$""".r
- val DESCRIBE_AGGREGATE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
- val DESCRIBE_AGGREGATE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s*(""" +
+ val DESCRIBE_AGGREGATE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+ val DESCRIBE_AGGREGATE_WITH_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s*(""" +
VALID_IDENTIFIER +
""")\.(""" +
VALID_IDENTIFIER +
""");\s*$""").r
- val DESCRIBE_AGGREGATES_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATES\s*;\s*$""").r
+ val DESCRIBE_AGGREGATES_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATES\s*;\s*$""".r
- val DESCRIBE_MATERIALIZED_VIEW_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s*("""+VALID_IDENTIFIER+""")\s*;\s*$""").r
- val DESCRIBE_MATERIALIZED_VIEW_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s*(""" +
+ val DESCRIBE_MATERIALIZED_VIEW_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s*("""+VALID_IDENTIFIER+""")\s*;\s*$""").r
+ val DESCRIBE_MATERIALIZED_VIEW_WITH_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s*(""" +
VALID_IDENTIFIER +
""")\.(""" +
VALID_IDENTIFIER +
""")\s*;\s*$""").r
- val DESCRIBE_MATERIALIZED_VIEWS_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEWS\s*;\s*$""").r
-
+ val DESCRIBE_MATERIALIZED_VIEWS_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEWS\s*;\s*$""".r
- val HELP_PATTERN = """^(?i)\s*HELP;\s*$""".r
+ val HELP_PATTERN: Regex = """^(?i)\s*HELP;\s*$""".r
}
class ParagraphParser extends RegexParsers{
-
import ParagraphParser._
- def singleLineCommentHash: Parser[Comment] = """\s*#.*""".r ^^ {case text => Comment(text.trim.replaceAll("#",""))}
- def singleLineCommentDoubleSlashes: Parser[Comment] = """\s*//.*""".r ^^ {case text => Comment(text.trim.replaceFirst("//\\s*",""))}
- def singleLineCommentDoubleDash: Parser[Comment] = """\s*--.*""".r ^^ {case text => Comment(text.trim.replaceFirst("//\\s*",""))}
+ def singleLineCommentHash: Parser[Comment] = """\s*#.*""".r ^^ {text => Comment(text.trim.replaceAll("#",""))}
+ def singleLineCommentDoubleSlashes: Parser[Comment] = """\s*//.*""".r ^^ {text => Comment(text.trim.replaceFirst("//\\s*",""))}
+ def singleLineCommentDoubleDash: Parser[Comment] = """\s*--.*""".r ^^ {text => Comment(text.trim.replaceFirst("//\\s*",""))}
def singleLineComment: Parser[Comment] = singleLineCommentHash | singleLineCommentDoubleSlashes | singleLineCommentDoubleDash
- def multiLineComment: Parser[Comment] = """(?s)/\*(.*)\*/""".r ^^ {case text => Comment(text.trim.replaceAll("""/\*""","").replaceAll("""\*/""",""))}
+ def multiLineComment: Parser[Comment] = """(?s)/\*(.*)\*/""".r ^^ {text => Comment(text.trim.replaceAll("""/\*""","").replaceAll("""\*/""",""))}
//Query parameters
- def consistency: Parser[Consistency] = """\s*@consistency.+""".r ^^ {case x => extractConsistency(x.trim)}
- def serialConsistency: Parser[SerialConsistency] = """\s*@serialConsistency.+""".r ^^ {case x => extractSerialConsistency(x.trim)}
- def timestamp: Parser[Timestamp] = """\s*@timestamp.+""".r ^^ {case x => extractTimestamp(x.trim)}
- def retryPolicy: Parser[RetryPolicy] = """\s*@retryPolicy.+""".r ^^ {case x => extractRetryPolicy(x.trim)}
- def fetchSize: Parser[FetchSize] = """\s*@fetchSize.+""".r ^^ {case x => extractFetchSize(x.trim)}
- def requestTimeOut: Parser[RequestTimeOut] = """\s*@requestTimeOut.+""".r ^^ {case x => extractRequestTimeOut(x.trim)}
+ def consistency: Parser[Consistency] = """\s*@consistency.+""".r ^^ {x => extractConsistency(x.trim)}
+ def serialConsistency: Parser[SerialConsistency] = """\s*@serialConsistency.+""".r ^^ {x => extractSerialConsistency(x.trim)}
+ def timestamp: Parser[Timestamp] = """\s*@timestamp.+""".r ^^ {x => extractTimestamp(x.trim)}
+ def fetchSize: Parser[FetchSize] = """\s*@fetchSize.+""".r ^^ {x => extractFetchSize(x.trim)}
+ def requestTimeOut: Parser[RequestTimeOut] = """\s*@requestTimeOut.+""".r ^^ {x => extractRequestTimeOut(x.trim)}
//Statements
- def createFunctionStatement: Parser[SimpleStm] = UDF_PATTERN ^^{case x => extractUdfStatement(x.trim)}
- def genericStatement: Parser[SimpleStm] = s"""$GENERIC_STATEMENT_PREFIX[^;]+;""".r ^^ {case x => extractSimpleStatement(x.trim)}
+ def createFunctionStatement(): Parser[SimpleStm] = UDF_PATTERN ^^{x => extractUdfStatement(x.trim)}
+ def genericStatement(): Parser[SimpleStm] = s"""$GENERIC_STATEMENT_PREFIX[^;]+;""".r ^^ {x => extractSimpleStatement(x.trim)}
// def allStatement: Parser[SimpleStm] = udfStatement | genericStatement
- def prepare: Parser[PrepareStm] = """\s*@prepare.+""".r ^^ {case x => extractPreparedStatement(x.trim)}
- def removePrepare: Parser[RemovePrepareStm] = """\s*@remove_prepare.+""".r ^^ {case x => extractRemovePreparedStatement(x.trim)}
- def bind: Parser[BoundStm] = """\s*@bind.+""".r ^^ {case x => extractBoundStatement(x.trim)}
+ def prepare(): Parser[PrepareStm] = """\s*@prepare.+""".r ^^ {x => extractPreparedStatement(x.trim)}
+ def removePrepare(): Parser[RemovePrepareStm] = """\s*@remove_prepare.+""".r ^^ {x => extractRemovePreparedStatement(x.trim)}
+ def bind(): Parser[BoundStm] = """\s*@bind.+""".r ^^ {x => extractBoundStatement(x.trim)}
//Meta data
- private def describeCluster: Parser[DescribeClusterCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER.*""".r ^^ {extractDescribeClusterCmd(_)}
- private def describeKeyspaces: Parser[DescribeKeyspacesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES.*""".r ^^ {extractDescribeKeyspacesCmd(_)}
- private def describeTables: Parser[DescribeTablesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLES.*""".r ^^ {extractDescribeTablesCmd(_)}
- private def describeTypes: Parser[DescribeTypesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TYPES.*""".r ^^ {extractDescribeTypesCmd(_)}
- private def describeFunctions: Parser[DescribeFunctionsCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTIONS.*""".r ^^ {extractDescribeFunctionsCmd(_)}
- private def describeAggregates: Parser[DescribeAggregatesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATES.*""".r ^^ {extractDescribeAggregatesCmd(_)}
- private def describeMaterializedViews: Parser[DescribeMaterializedViewsCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEWS.*""".r ^^ {extractDescribeMaterializedViewsCmd(_)}
- private def describeKeyspace: Parser[DescribeKeyspaceCmd] = """\s*(?i)(?:DESCRIBE|DESC)\s+KEYSPACE\s+.+""".r ^^ {extractDescribeKeyspaceCmd(_)}
- private def describeTable: Parser[DescribeTableCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s+.+""".r ^^ {extractDescribeTableCmd(_)}
- private def describeType: Parser[DescribeTypeCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s+.*""".r ^^ {extractDescribeTypeCmd(_)}
- private def describeFunction: Parser[DescribeFunctionCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s+.*""".r ^^ {extractDescribeFunctionCmd(_)}
- private def describeAggregate: Parser[DescribeAggregateCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s+.*""".r ^^ {extractDescribeAggregateCmd(_)}
- private def describeMaterializedView: Parser[DescribeMaterializedViewCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s+.*""".r ^^ {extractDescribeMaterializedViewCmd(_)}
+ private def describeCluster: Parser[DescribeClusterCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER.*""".r ^^ extractDescribeClusterCmd
+ private def describeKeyspaces: Parser[DescribeKeyspacesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES.*""".r ^^ extractDescribeKeyspacesCmd
+ private def describeTables: Parser[DescribeTablesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLES.*""".r ^^ extractDescribeTablesCmd
+ private def describeTypes: Parser[DescribeTypesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TYPES.*""".r ^^ extractDescribeTypesCmd
+ private def describeFunctions: Parser[DescribeFunctionsCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTIONS.*""".r ^^ extractDescribeFunctionsCmd
+ private def describeAggregates: Parser[DescribeAggregatesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATES.*""".r ^^ extractDescribeAggregatesCmd
+ private def describeMaterializedViews: Parser[DescribeMaterializedViewsCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEWS.*""".r ^^ extractDescribeMaterializedViewsCmd
+ private def describeKeyspace: Parser[DescribeKeyspaceCmd] = """\s*(?i)(?:DESCRIBE|DESC)\s+KEYSPACE\s+.+""".r ^^ extractDescribeKeyspaceCmd
+ private def describeTable: Parser[DescribeTableCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s+.+""".r ^^ extractDescribeTableCmd
+ private def describeType: Parser[DescribeTypeCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s+.*""".r ^^ extractDescribeTypeCmd
+ private def describeFunction: Parser[DescribeFunctionCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s+.*""".r ^^ extractDescribeFunctionCmd
+ private def describeAggregate: Parser[DescribeAggregateCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s+.*""".r ^^ extractDescribeAggregateCmd
+ private def describeMaterializedView: Parser[DescribeMaterializedViewCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s+.*""".r ^^ extractDescribeMaterializedViewCmd
//Help
- private def helpCommand: Parser[HelpCmd] = """(?i)\s*HELP.*""".r ^^{extractHelpCmd(_)}
+ private def helpCommand: Parser[HelpCmd] = """(?i)\s*HELP.*""".r ^^ extractHelpCmd
private def beginBatch: Parser[String] = """(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r
private def applyBatch: Parser[String] = """(?i)APPLY BATCH;""".r
- private def insert: Parser[SimpleStm] = """(?i)INSERT [^;]+;""".r ^^{SimpleStm(_)}
- private def update: Parser[SimpleStm] = """(?i)UPDATE [^;]+;""".r ^^{SimpleStm(_)}
- private def delete: Parser[SimpleStm] = """(?i)DELETE [^;]+;""".r ^^{SimpleStm(_)}
+ private def insert: Parser[SimpleStm] = """(?i)INSERT [^;]+;""".r ^^ SimpleStm
+ private def update: Parser[SimpleStm] = """(?i)UPDATE [^;]+;""".r ^^ SimpleStm
+ private def delete: Parser[SimpleStm] = """(?i)DELETE [^;]+;""".r ^^ SimpleStm
private def mutationStatements: Parser[List[QueryStatement]] = rep(insert | update | delete | bind)
def batch: Parser[BatchStm] = beginBatch ~ mutationStatements ~ applyBatch ^^ {
- case begin ~ cqls ~ end => BatchStm(extractBatchType(begin),cqls)}
+ case begin ~ cqls ~ end => BatchStm(extractBatchType(begin), cqls)
+ }
def queries:Parser[List[AnyBlock]] = rep(singleLineComment | multiLineComment | consistency | serialConsistency |
- timestamp | retryPolicy | fetchSize | requestTimeOut | removePrepare | prepare | bind | batch | describeCluster |
+ timestamp | fetchSize | requestTimeOut | removePrepare | prepare | bind | batch | describeCluster |
describeKeyspace | describeKeyspaces |
describeTable | describeTables |
describeType | describeTypes |
@@ -202,7 +201,7 @@ class ParagraphParser extends RegexParsers{
def extractConsistency(text: String): Consistency = {
text match {
- case CONSISTENCY_LEVEL_PATTERN(consistency) => Consistency(ConsistencyLevel.valueOf(consistency))
+ case CONSISTENCY_LEVEL_PATTERN(consistency) => Consistency(DefaultConsistencyLevel.valueOf(consistency))
case _ => throw new InterpreterException(s"Invalid syntax for @consistency. " +
s"It should comply to the pattern ${CONSISTENCY_LEVEL_PATTERN.toString}")
}
@@ -210,7 +209,8 @@ class ParagraphParser extends RegexParsers{
def extractSerialConsistency(text: String): SerialConsistency = {
text match {
- case SERIAL_CONSISTENCY_LEVEL_PATTERN(consistency) => SerialConsistency(ConsistencyLevel.valueOf(consistency))
+ case SERIAL_CONSISTENCY_LEVEL_PATTERN(consistency) =>
+ SerialConsistency(DefaultConsistencyLevel.valueOf(consistency))
case _ => throw new InterpreterException(s"Invalid syntax for @serialConsistency. " +
s"It should comply to the pattern ${SERIAL_CONSISTENCY_LEVEL_PATTERN.toString}")
}
@@ -224,24 +224,10 @@ class ParagraphParser extends RegexParsers{
}
}
- def extractRetryPolicy(text: String): RetryPolicy = {
- text match {
- case RETRY_POLICIES_PATTERN(retry) => retry.trim match {
- case DEFAULT_POLICY => DefaultRetryPolicy
- case DOWNGRADING_CONSISTENCY_RETRY => DowngradingRetryPolicy
- case FALLTHROUGH_RETRY => FallThroughRetryPolicy
- case LOGGING_DEFAULT_RETRY => LoggingDefaultRetryPolicy
- case LOGGING_DOWNGRADING_RETRY => LoggingDowngradingRetryPolicy
- case LOGGING_FALLTHROUGH_RETRY => LoggingFallThroughRetryPolicy
- }
- case _ => throw new InterpreterException(s"Invalid syntax for @retryPolicy. " +
- s"It should comply to the pattern ${RETRY_POLICIES_PATTERN.toString}")
- }
- }
-
def extractFetchSize(text: String): FetchSize = {
text match {
- case FETCHSIZE_PATTERN(fetchSize) => FetchSize(fetchSize.trim.toInt)
+ case FETCHSIZE_PATTERN(fetchSize) =>
+ FetchSize(fetchSize.trim.toInt)
case _ => throw new InterpreterException(s"Invalid syntax for @fetchSize. " +
s"It should comply to the pattern ${FETCHSIZE_PATTERN.toString}")
}
@@ -249,7 +235,8 @@ class ParagraphParser extends RegexParsers{
def extractRequestTimeOut(text: String): RequestTimeOut = {
text match {
- case REQUEST_TIMEOUT_PATTERN(requestTimeOut) => RequestTimeOut(requestTimeOut.trim.toInt)
+ case REQUEST_TIMEOUT_PATTERN(requestTimeOut) =>
+ RequestTimeOut(requestTimeOut.trim.toInt)
case _ => throw new InterpreterException(s"Invalid syntax for @requestTimeOut. " +
s"It should comply to the pattern ${REQUEST_TIMEOUT_PATTERN.toString}")
}
@@ -257,21 +244,24 @@ class ParagraphParser extends RegexParsers{
def extractSimpleStatement(text: String): SimpleStm = {
text match {
- case SIMPLE_STATEMENT_PATTERN(statement) => SimpleStm(statement)
+ case SIMPLE_STATEMENT_PATTERN(statement) =>
+ SimpleStm(statement)
case _ => throw new InterpreterException(s"Invalid statement '$text'. Did you forget to add ; (semi-colon) at the end of each CQL statement ?")
}
}
def extractUdfStatement(text: String): SimpleStm = {
text match {
- case UDF_PATTERN(statement) => SimpleStm(statement)
+ case UDF_PATTERN(statement) =>
+ SimpleStm(statement)
case _ => throw new InterpreterException(s"Invalid statement '$text' for UDF creation. Did you forget to add ; (semi-colon) at the end of each CQL statement ?")
}
}
def extractPreparedStatement(text: String): PrepareStm = {
text match {
- case PREPARE_STATEMENT_PATTERN(name,queryString) => PrepareStm(name.trim,queryString.trim)
+ case PREPARE_STATEMENT_PATTERN(name,queryString) =>
+ PrepareStm(name.trim,queryString.trim)
case _ => throw new InterpreterException(s"Invalid syntax for @prepare. " +
s"It should comply to the pattern: @prepare[prepared_statement_name]=CQL Statement (without semi-colon)")
}
@@ -279,7 +269,8 @@ class ParagraphParser extends RegexParsers{
def extractRemovePreparedStatement(text: String): RemovePrepareStm= {
text match {
- case REMOVE_PREPARE_STATEMENT_PATTERN(name) => RemovePrepareStm(name.trim)
+ case REMOVE_PREPARE_STATEMENT_PATTERN(name) =>
+ RemovePrepareStm(name.trim)
case _ => throw new InterpreterException(s"Invalid syntax for @remove_prepare. " +
s"It should comply to the pattern: @remove_prepare[prepared_statement_name]")
}
@@ -287,18 +278,19 @@ class ParagraphParser extends RegexParsers{
def extractBoundStatement(text: String): BoundStm = {
text match {
- case BIND_PATTERN(name,boundValues) => BoundStm(name.trim, Option(boundValues).map(_.trim).getOrElse(""))
+ case BIND_PATTERN(name,boundValues) =>
+ BoundStm(name.trim, Option(boundValues).map(_.trim).getOrElse(""))
case _ => throw new InterpreterException("Invalid syntax for @bind. It should comply to the pattern: " +
"@bind[prepared_statement_name]=10,'jdoe','John DOE',12345,'2015-07-32 12:04:23.234' " +
"OR @bind[prepared_statement_name] with no bound value. No semi-colon")
}
}
- def extractBatchType(text: String): BatchStatement.Type = {
+ def extractBatchType(text: String): BatchType = {
text match {
case BATCH_PATTERN(batchType) =>
val inferredType = Option(batchType).getOrElse("LOGGED")
- BatchStatement.Type.valueOf(inferredType.toUpperCase)
+ DefaultBatchType.valueOf(inferredType.toUpperCase)
case _ => throw new InterpreterException(s"Invalid syntax for BEGIN BATCH. " +
s"""It should comply to the pattern: ${BATCH_PATTERN.toString}""")
}
@@ -306,7 +298,8 @@ class ParagraphParser extends RegexParsers{
def extractDescribeClusterCmd(text: String): DescribeClusterCmd = {
text match {
- case DESCRIBE_CLUSTER_PATTERN() => new DescribeClusterCmd
+ case DESCRIBE_CLUSTER_PATTERN() =>
+ new DescribeClusterCmd
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE CLUSTER. " +
s"""It should comply to the pattern: ${DESCRIBE_CLUSTER_PATTERN.toString}""")
}
@@ -314,7 +307,8 @@ class ParagraphParser extends RegexParsers{
def extractDescribeKeyspaceCmd(text: String): DescribeKeyspaceCmd = {
text match {
- case DESCRIBE_KEYSPACE_PATTERN(keyspace) => new DescribeKeyspaceCmd(keyspace)
+ case DESCRIBE_KEYSPACE_PATTERN(keyspace) =>
+ DescribeKeyspaceCmd(keyspace)
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE KEYSPACE. " +
s"""It should comply to the pattern: ${DESCRIBE_KEYSPACE_PATTERN.toString}""")
}
@@ -322,7 +316,8 @@ class ParagraphParser extends RegexParsers{
def extractDescribeKeyspacesCmd(text: String): DescribeKeyspacesCmd = {
text match {
- case DESCRIBE_KEYSPACES_PATTERN() => new DescribeKeyspacesCmd
+ case DESCRIBE_KEYSPACES_PATTERN() =>
+ new DescribeKeyspacesCmd
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE KEYSPACES. " +
s"""It should comply to the pattern: ${DESCRIBE_KEYSPACES_PATTERN.toString}""")
}
@@ -330,8 +325,10 @@ class ParagraphParser extends RegexParsers{
def extractDescribeTableCmd(text: String): DescribeTableCmd = {
text match {
- case DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN(keyspace,table) => new DescribeTableCmd(Option(keyspace),table)
- case DESCRIBE_TABLE_PATTERN(table) => new DescribeTableCmd(Option.empty,table)
+ case DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN(keyspace,table) =>
+ DescribeTableCmd(Option(keyspace),table)
+ case DESCRIBE_TABLE_PATTERN(table) =>
+ DescribeTableCmd(Option.empty,table)
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TABLE. " +
s"""It should comply to the patterns: ${DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_TABLE_PATTERN.toString}""".stripMargin)
}
@@ -339,7 +336,8 @@ class ParagraphParser extends RegexParsers{
def extractDescribeTablesCmd(text: String): DescribeTablesCmd = {
text match {
- case DESCRIBE_TABLES_PATTERN() => new DescribeTablesCmd
+ case DESCRIBE_TABLES_PATTERN() =>
+ new DescribeTablesCmd
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TABLES. " +
s"""It should comply to the pattern: ${DESCRIBE_TABLES_PATTERN.toString}""")
}
@@ -347,8 +345,10 @@ class ParagraphParser extends RegexParsers{
def extractDescribeTypeCmd(text: String): DescribeTypeCmd = {
text match {
- case DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN(keyspace,table) => new DescribeTypeCmd(Option(keyspace),table)
- case DESCRIBE_TYPE_PATTERN(table) => new DescribeTypeCmd(Option.empty,table)
+ case DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN(keyspace,table) =>
+ DescribeTypeCmd(Option(keyspace),table)
+ case DESCRIBE_TYPE_PATTERN(table) =>
+ DescribeTypeCmd(Option.empty,table)
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TYPE. " +
s"""It should comply to the patterns: ${DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_TYPE_PATTERN.toString}""".stripMargin)
}
@@ -356,7 +356,8 @@ class ParagraphParser extends RegexParsers{
def extractDescribeTypesCmd(text: String): DescribeTypesCmd = {
text match {
- case DESCRIBE_TYPES_PATTERN() => new DescribeTypesCmd
+ case DESCRIBE_TYPES_PATTERN() =>
+ new DescribeTypesCmd
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TYPES. " +
s"""It should comply to the pattern: ${DESCRIBE_TYPES_PATTERN.toString}""")
}
@@ -364,8 +365,10 @@ class ParagraphParser extends RegexParsers{
def extractDescribeFunctionCmd(text: String): DescribeFunctionCmd = {
text match {
- case DESCRIBE_FUNCTION_WITH_KEYSPACE_PATTERN(keyspace,function) => new DescribeFunctionCmd(Option(keyspace),function)
- case DESCRIBE_FUNCTION_PATTERN(function) => new DescribeFunctionCmd(Option.empty,function)
+ case DESCRIBE_FUNCTION_WITH_KEYSPACE_PATTERN(keyspace,function) =>
+ DescribeFunctionCmd(Option(keyspace),function)
+ case DESCRIBE_FUNCTION_PATTERN(function) =>
+ DescribeFunctionCmd(Option.empty,function)
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE FUNCTION. " +
s"""It should comply to the patterns: ${DESCRIBE_FUNCTION_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_FUNCTION_PATTERN.toString}""".stripMargin)
}
@@ -373,7 +376,8 @@ class ParagraphParser extends RegexParsers{
def extractDescribeFunctionsCmd(text: String): DescribeFunctionsCmd = {
text match {
- case DESCRIBE_FUNCTIONS_PATTERN() => new DescribeFunctionsCmd
+ case DESCRIBE_FUNCTIONS_PATTERN() =>
+ new DescribeFunctionsCmd
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE FUNCTIONS. " +
s"""It should comply to the pattern: ${DESCRIBE_FUNCTIONS_PATTERN.toString}""".stripMargin)
}
@@ -381,8 +385,10 @@ class ParagraphParser extends RegexParsers{
def extractDescribeAggregateCmd(text: String): DescribeAggregateCmd = {
text match {
- case DESCRIBE_AGGREGATE_WITH_KEYSPACE_PATTERN(keyspace,aggregate) => new DescribeAggregateCmd(Option(keyspace),aggregate)
- case DESCRIBE_AGGREGATE_PATTERN(aggregate) => new DescribeAggregateCmd(Option.empty,aggregate)
+ case DESCRIBE_AGGREGATE_WITH_KEYSPACE_PATTERN(keyspace,aggregate) =>
+ DescribeAggregateCmd(Option(keyspace),aggregate)
+ case DESCRIBE_AGGREGATE_PATTERN(aggregate) =>
+ DescribeAggregateCmd(Option.empty,aggregate)
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE AGGREGATE. " +
s"""It should comply to the patterns: ${DESCRIBE_AGGREGATE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_AGGREGATE_PATTERN.toString}""".stripMargin)
}
@@ -390,7 +396,8 @@ class ParagraphParser extends RegexParsers{
def extractDescribeAggregatesCmd(text: String): DescribeAggregatesCmd = {
text match {
- case DESCRIBE_AGGREGATES_PATTERN() => new DescribeAggregatesCmd
+ case DESCRIBE_AGGREGATES_PATTERN() =>
+ new DescribeAggregatesCmd
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE AGGREGATES. " +
s"""It should comply to the pattern: ${DESCRIBE_AGGREGATES_PATTERN.toString}""".stripMargin)
}
@@ -398,8 +405,10 @@ class ParagraphParser extends RegexParsers{
def extractDescribeMaterializedViewCmd(text: String): DescribeMaterializedViewCmd = {
text match {
- case DESCRIBE_MATERIALIZED_VIEW_WITH_KEYSPACE_PATTERN(keyspace,view) => new DescribeMaterializedViewCmd(Option(keyspace),view)
- case DESCRIBE_MATERIALIZED_VIEW_PATTERN(view) => new DescribeMaterializedViewCmd(Option.empty,view)
+ case DESCRIBE_MATERIALIZED_VIEW_WITH_KEYSPACE_PATTERN(keyspace,view) =>
+ DescribeMaterializedViewCmd(Option(keyspace),view)
+ case DESCRIBE_MATERIALIZED_VIEW_PATTERN(view) =>
+ DescribeMaterializedViewCmd(Option.empty,view)
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE MATERIALIZED VIEW. " +
s"""It should comply to the patterns: ${DESCRIBE_MATERIALIZED_VIEW_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_MATERIALIZED_VIEW_PATTERN.toString}""".stripMargin)
}
@@ -407,7 +416,8 @@ class ParagraphParser extends RegexParsers{
def extractDescribeMaterializedViewsCmd(text: String): DescribeMaterializedViewsCmd = {
text match {
- case DESCRIBE_MATERIALIZED_VIEWS_PATTERN() => new DescribeMaterializedViewsCmd
+ case DESCRIBE_MATERIALIZED_VIEWS_PATTERN() =>
+ new DescribeMaterializedViewsCmd
case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE MATERIALIZED VIEWS. " +
s"""It should comply to the pattern: ${DESCRIBE_MATERIALIZED_VIEWS_PATTERN.toString}""".stripMargin)
}
@@ -415,7 +425,8 @@ class ParagraphParser extends RegexParsers{
def extractHelpCmd(text: String): HelpCmd = {
text match {
- case HELP_PATTERN() => new HelpCmd
+ case HELP_PATTERN() =>
+ new HelpCmd
case _ => throw new InterpreterException(s"Invalid syntax for HELP. " +
s"""It should comply to the patterns: ${HELP_PATTERN.toString}""".stripMargin)
}
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
index be55564..790d397 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
@@ -17,6 +17,8 @@
package org.apache.zeppelin.cassandra
import com.datastax.driver.core._
+import com.datastax.oss.driver.api.core.ConsistencyLevel
+import com.datastax.oss.driver.api.core.cql.BatchType
/**
* Define a Scala object hierarchy
@@ -42,7 +44,6 @@ object TextBlockHierarchy {
object ConsistencyParam extends ParameterType
object SerialConsistencyParam extends ParameterType
object TimestampParam extends ParameterType
- object RetryPolicyParam extends ParameterType
object FetchSizeParam extends ParameterType
object RequestTimeOutParam extends ParameterType
@@ -63,15 +64,6 @@ object TextBlockHierarchy {
case class RequestTimeOut(value: Int) extends QueryParameters(RequestTimeOutParam)
- abstract class RetryPolicy extends QueryParameters(RetryPolicyParam)
-
- object DefaultRetryPolicy extends RetryPolicy
- object DowngradingRetryPolicy extends RetryPolicy
- object FallThroughRetryPolicy extends RetryPolicy
- object LoggingDefaultRetryPolicy extends RetryPolicy
- object LoggingDowngradingRetryPolicy extends RetryPolicy
- object LoggingFallThroughRetryPolicy extends RetryPolicy
-
sealed trait StatementType
object PrepareStatementType extends StatementType
object RemovePrepareStatementType extends StatementType
@@ -106,7 +98,7 @@ object TextBlockHierarchy {
case class BoundStm(name: String, values:String) extends QueryStatement(BoundStatementType)
- case class BatchStm(batchType: BatchStatement.Type, statements: List[QueryStatement])
+ case class BatchStm(batchType: BatchType, statements: List[QueryStatement])
extends QueryStatement(BatchStatementType)
sealed trait DescribeCommandStatement {
diff --git a/cassandra/src/main/scala/scala/compat/java8/OptionConverters.scala b/cassandra/src/main/scala/scala/compat/java8/OptionConverters.scala
new file mode 100644
index 0000000..49e06c2
--- /dev/null
+++ b/cassandra/src/main/scala/scala/compat/java8/OptionConverters.scala
@@ -0,0 +1,140 @@
+/*
+ * Scala (https://www.scala-lang.org)
+ *
+ * Copyright EPFL and Lightbend, Inc.
+ *
+ * Licensed under Apache License 2.0
+ * (http://www.apache.org/licenses/LICENSE-2.0).
+ *
+ * See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.
+ */
+
+package scala.compat.java8
+
+import java.util.{Optional, OptionalDouble, OptionalInt, OptionalLong}
+
+/** This class enables bidirectional conversion between `scala.Option` and the
+ * set of `java.util.Optional` classes.
+ *
+ * The Scala `Option` is generic; its generic counterpart in Java is
+ * `java.util.Optional`. `Option` is enriched with an `asJava` method, while
+ * `Optional` is enriched with `asScala` to perform conversions.
+ *
+ * In addition, both `Option` and `Optional` are enriched with `asPrimitive`
+ * methods that will convert generically contained primitives to the manually
+ * specialized Java versions for primitives, `OptionalDouble`, `OptionalInt`,
+ * and `OptionalLong`. The primitive versions can be converted to the Scala
+ * generic `Option` with `asScala` and to the Java generic `Optional` with
+ * `asGeneric`.
+ *
+ * When calling from Java, methods are more convenient than extension methods,
+ * so `toJava` and `toScala` methods are provided that convert to and from
+ * Scala's `Option`. Note that `toJava(toScala(x))` will result in a generic
+ * `Optional` even if `x` was one of the primitive versons.
+ *
+ * Example usage:
+ *
+ * {{{
+ * import scala.compat.java8.OptionConverters._
+ * val a = Option("example").asJava // Creates java.util.Optional[String] containing "example"
+ * val b = (None: Option[String]).asJava // Creates an empty java.util.Optional[String]
+ * val c = a.asScala // Back to Option("example")
+ * val d = b.asScala // Back to None typed as Option[String]
+ * val e = Option(2.7).asJava // java.util.Optional[Double] containing boxed 2.7
+ * val f = Option(2.7).asPrimitive // java.util.OptionalDouble containing 2.7 (not boxed)
+ * val g = f.asScala // Back to Option(2.7)
+ * val h = f.asGeneric // Same as e
+ * val i = e.asPrimitive // Same as f
+ * val j = toJava(Option("example")) // Same as a
+ * val k = toScala(a) // Same as c
+ * }}}
+ */
+object OptionConverters {
+ /** Type class implementing conversion from generic `Option` or `Optional` to manually specialized variants. */
+ sealed abstract class SpecializerOfOptions[A, That] {
+ /** Converts from `Optional` to a manually specialized variant `That` */
+ def fromJava(o: Optional[A]): That
+ /** Converts from `Option` to a manually specialized variant `That` */
+ def fromScala(o: Option[A]): That
+ }
+
+ /** Implementation of creation of `OptionalDouble` from `Option[Double]` or `Optional[Double]`*/
+ implicit val specializer_OptionalDouble = new SpecializerOfOptions[Double, OptionalDouble] {
+ /** Creates an `OptionalDouble` from `Optional[Double]` */
+ def fromJava(o: Optional[Double]): OptionalDouble = if (o.isPresent) OptionalDouble.of(o.get) else OptionalDouble.empty
+ /** Creates an `OptionalDouble` from `Option[Double]` */
+ def fromScala(o: Option[Double]): OptionalDouble = o match { case Some(d) => OptionalDouble.of(d); case _ => OptionalDouble.empty }
+ }
+
+ /** Implementation of creation of `OptionalInt` from `Option[Int]` or `Optional[Int]`*/
+ implicit val specializer_OptionalInt = new SpecializerOfOptions[Int, OptionalInt] {
+ /** Creates an `OptionalInt` from `Optional[Int]` */
+ def fromJava(o: Optional[Int]): OptionalInt = if (o.isPresent) OptionalInt.of(o.get) else OptionalInt.empty
+ /** Creates an `OptionalInt` from `Option[Int]` */
+ def fromScala(o: Option[Int]): OptionalInt = o match { case Some(d) => OptionalInt.of(d); case _ => OptionalInt.empty }
+ }
+
+ /** Implementation of creation of `OptionalLong` from `Option[Long]` or `Optional[Long]`*/
+ implicit val specializer_OptionalLong = new SpecializerOfOptions[Long, OptionalLong] {
+ /** Creates an `OptionalLong` from `Optional[Long]` */
+ def fromJava(o: Optional[Long]): OptionalLong = if (o.isPresent) OptionalLong.of(o.get) else OptionalLong.empty
+ /** Creates an `OptionalLong` from `Option[Long]` */
+ def fromScala(o: Option[Long]): OptionalLong = o match { case Some(d) => OptionalLong.of(d); case _ => OptionalLong.empty }
+ }
+
+ /** Provides conversions from `java.util.Optional` to Scala `Option` or primitive `java.util.Optional` types */
+ implicit class RichOptionalGeneric[A](val underlying: java.util.Optional[A]) extends AnyVal {
+ /** Create a `scala.Option` version of this `Optional` */
+ def asScala: Option[A] = if (underlying.isPresent) Some(underlying.get) else None
+ /** Create a specialized primitive variant of this generic `Optional`, if an appropriate one exists */
+ def asPrimitive[That](implicit specOp: SpecializerOfOptions[A, That]): That = specOp.fromJava(underlying)
+ }
+
+ /** Provides conversions from `scala.Option` to Java `Optional` types, either generic or primitive */
+ implicit class RichOptionForJava8[A](val underlying: Option[A]) extends AnyVal {
+ /** Create a `java.util.Optional` version of this `Option` (not specialized) */
+ def asJava: Optional[A] = underlying match { case Some(a) => Optional.ofNullable(a); case _ => Optional.empty[A] }
+ /** Create a specialized primitive `java.util.Optional` type, if an appropriate one exists */
+ def asPrimitive[That](implicit specOp: SpecializerOfOptions[A, That]): That = specOp.fromScala(underlying)
+ }
+
+ /** Provides conversions from `java.util.OptionalDouble` to the generic `Optional` and Scala `Option` */
+ implicit class RichOptionalDouble(val underlying: OptionalDouble) extends AnyVal {
+ /** Create a `scala.Option` version of this `OptionalDouble` */
+ def asScala: Option[Double] = if (underlying.isPresent) Some(underlying.getAsDouble) else None
+ /** Create a generic `java.util.Optional` version of this `OptionalDouble` */
+ def asGeneric: Optional[Double] = if (underlying.isPresent) Optional.of(underlying.getAsDouble) else Optional.empty[Double]
+ }
+
+ /** Provides conversions from `java.util.OptionalInt` to the generic `Optional` and Scala `Option` */
+ implicit class RichOptionalInt(val underlying: OptionalInt) extends AnyVal {
+ /** Create a `scala.Option` version of this `OptionalInt` */
+ def asScala: Option[Int] = if (underlying.isPresent) Some(underlying.getAsInt) else None
+ /** Create a generic `java.util.Optional` version of this `OptionalInt` */
+ def asGeneric: Optional[Int] = if (underlying.isPresent) Optional.of(underlying.getAsInt) else Optional.empty[Int]
+ }
+
+ /** Provides conversions from `java.util.OptionalLong` to the generic `Optional` and Scala `Option` */
+ implicit class RichOptionalLong(val underlying: OptionalLong) extends AnyVal {
+ /** Create a `scala.Option` version of this `OptionalLong` */
+ def asScala: Option[Long] = if (underlying.isPresent) Some(underlying.getAsLong) else None
+ /** Create a generic `java.util.Optional` version of this `OptionalLong` */
+ def asGeneric: Optional[Long] = if (underlying.isPresent) Optional.of(underlying.getAsLong) else Optional.empty[Long]
+ }
+
+ /** Conversion from Scala `Option` to Java `Optional` without using implicits, for convenient use from Java. */
+ final def toJava[A](o: Option[A]): Optional[A] = o match { case Some(a) => Optional.ofNullable(a); case _ => Optional.empty[A] }
+
+ /** Conversion from Java `Optional` to Scala `Option` without using implicits, for convenient use from Java */
+ final def toScala[A](o: Optional[A]): Option[A] = if (o.isPresent) Some(o.get) else None
+
+ /** Conversion from Java `OptionalDouble` to Scala `Option` without using implicits, for convenient use from Java */
+ final def toScala(o: OptionalDouble): Option[Double] = if (o.isPresent) Some(o.getAsDouble) else None
+
+ /** Conversion from Java `OptionalInt` to Scala `Option` without using implicits, for convenient use from Java */
+ final def toScala(o: OptionalInt): Option[Int] = if (o.isPresent) Some(o.getAsInt) else None
+
+ /** Conversion from Java `OptionalLong` to Scala `Option` without using implicits, for convenient use from Java */
+ final def toScala(o: OptionalLong): Option[Long] = if (o.isPresent) Some(o.getAsLong) else None
+}
diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
index 0417dc9..8b40210 100644
--- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
+++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
@@ -16,12 +16,32 @@
*/
package org.apache.zeppelin.cassandra;
-import static com.google.common.collect.FluentIterable.from;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.when;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.internal.core.type.codec.TimestampCodec;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.cassandraunit.CQLDataLoader;
+import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
+import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
-import static com.datastax.driver.core.ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.util.Properties;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CLUSTER_NAME;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_COMPRESSION_PROTOCOL;
@@ -29,20 +49,12 @@ import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CREDE
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CREDENTIALS_USERNAME;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_HOSTS;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_LOAD_BALANCING_POLICY;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_PORT;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_PROTOCOL_VERSION;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_CONSISTENCY;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_FETCH_SIZE;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY;
@@ -52,80 +64,40 @@ import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKE
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKET_TCP_NO_DELAY;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SPECULATIVE_EXECUTION_POLICY;
-
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Properties;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ProtocolVersion;
-import com.datastax.driver.core.Session;
-
-import info.archinnov.achilles.embedded.CassandraEmbeddedServerBuilder;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class CassandraInterpreterTest {
+public class CassandraInterpreterTest { //extends AbstractCassandraUnit4CQLTestCase {
private static final String ARTISTS_TABLE = "zeppelin.artists";
+ private static final int DEFAULT_UNIT_TEST_PORT = 9142;
- public static Session session = CassandraEmbeddedServerBuilder
- .noEntityPackages()
- .withKeyspaceName("zeppelin")
- .withScript("prepare_schema.cql")
- .withScript("prepare_data.cql")
- .withProtocolVersion(ProtocolVersion.V3)
- .buildNativeSessionOnly();
-
- private static CassandraInterpreter interpreter;
+ private static volatile CassandraInterpreter interpreter;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private InterpreterContext intrContext;
@BeforeClass
- public static void setUp() {
- Properties properties = new Properties();
- final Cluster cluster = session.getCluster();
+ public static synchronized void setUp() throws IOException, InterruptedException {
+ EmbeddedCassandraServerHelper.startEmbeddedCassandra();
+ CqlSession session = EmbeddedCassandraServerHelper.getSession();
+ new CQLDataLoader(session).load(new ClassPathCQLDataSet("prepare_all.cql", "zeppelin"));
- properties.setProperty(CASSANDRA_CLUSTER_NAME, cluster.getClusterName());
+ Properties properties = new Properties();
+ properties.setProperty(CASSANDRA_CLUSTER_NAME, EmbeddedCassandraServerHelper.getClusterName());
properties.setProperty(CASSANDRA_COMPRESSION_PROTOCOL, "NONE");
properties.setProperty(CASSANDRA_CREDENTIALS_USERNAME, "none");
properties.setProperty(CASSANDRA_CREDENTIALS_PASSWORD, "none");
- properties.setProperty(CASSANDRA_PROTOCOL_VERSION, "3");
properties.setProperty(CASSANDRA_LOAD_BALANCING_POLICY, "DEFAULT");
properties.setProperty(CASSANDRA_RETRY_POLICY, "DEFAULT");
properties.setProperty(CASSANDRA_RECONNECTION_POLICY, "DEFAULT");
properties.setProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY, "DEFAULT");
- properties.setProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
- DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS + "");
-
- properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL, "100");
- properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE, "100");
- properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL, "2");
- properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE, "1");
- properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL, "8");
- properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE, "2");
- properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL, "1024");
- properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE, "256");
+ properties.setProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL, "2");
+ properties.setProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE, "1");
+ properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION, "1024");
- properties.setProperty(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS, "120");
properties.setProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS, "5000");
properties.setProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS, "30");
@@ -137,10 +109,9 @@ public class CassandraInterpreterTest {
properties.setProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS, "12000");
properties.setProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, "true");
- properties.setProperty(CASSANDRA_HOSTS, from(cluster.getMetadata().getAllHosts()).first()
- .get().getAddress().getHostAddress());
- properties.setProperty(CASSANDRA_PORT, cluster.getConfiguration().getProtocolOptions()
- .getPort() + "");
+ properties.setProperty(CASSANDRA_HOSTS, EmbeddedCassandraServerHelper.getHost());
+ properties.setProperty(CASSANDRA_PORT,
+ Integer.toString(EmbeddedCassandraServerHelper.getNativeTransportPort()));
interpreter = new CassandraInterpreter(properties);
interpreter.open();
}
@@ -157,9 +128,6 @@ public class CassandraInterpreterTest {
@Test
public void should_create_cluster_and_session_upon_call_to_open() throws Exception {
- assertThat(interpreter.cluster).isNotNull();
- assertThat(interpreter.cluster.getClusterName()).isEqualTo(session.getCluster()
- .getClusterName());
assertThat(interpreter.session).isNotNull();
assertThat(interpreter.helper).isNotNull();
}
@@ -176,19 +144,21 @@ public class CassandraInterpreterTest {
assertThat(actual).isNotNull();
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
assertThat(actual.message().get(0).getData()).isEqualTo("name\tborn\tcountry\tdied\tgender\t" +
- "styles\ttype\n" +
- "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
- "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n" +
- "Sheryl Crow\t1962-02-11\tUSA\tnull\tFemale\t" +
- "[Classic, Rock, Country, Blues, Pop, Folk]\tPerson\n" +
- "Doof\t1968-08-31\tUnited Kingdom\tnull\tnull\t[Unknown]\tPerson\n" +
- "House of Large Sizes\t1986-01-01\tUSA\t2003\tnull\t[Unknown]\tGroup\n" +
- "Fanfarlo\t2006-01-01\tUnited Kingdom\tnull\tnull\t" +
- "[Rock, Indie, Pop, Classic]\tGroup\n" +
- "Jeff Beck\t1944-06-24\tUnited Kingdom\tnull\tMale\t[Rock, Pop, Classic]\tPerson\n" +
- "Los Paranoias\tnull\tUnknown\tnull\tnull\t[Unknown]\tnull\n" +
- "…And You Will Know Us by the Trail of Dead\t1994-01-01\tUSA\tnull\tnull\t" +
- "[Rock, Pop, Classic]\tGroup\n");
+ "styles\ttype\n" +
+ "'Bogdan Raczynski'\t'1977-01-01'\t'Poland'\tnull\t'Male'\t" +
+ "['Dance','Electro']\t'Person'\n" +
+ "'Krishna Das'\t'1947-05-31'\t'USA'\tnull\t'Male'\t['Unknown']\t'Person'\n" +
+ "'Sheryl Crow'\t'1962-02-11'\t'USA'\tnull\t'Female'\t" +
+ "['Classic','Rock','Country','Blues','Pop','Folk']\t'Person'\n" +
+ "'Doof'\t'1968-08-31'\t'United Kingdom'\tnull\tnull\t['Unknown']\t'Person'\n" +
+ "'House of Large Sizes'\t'1986-01-01'\t'USA'\t'2003'\tnull\t['Unknown']\t'Group'\n" +
+ "'Fanfarlo'\t'2006-01-01'\t'United Kingdom'\tnull\tnull\t" +
+ "['Rock','Indie','Pop','Classic']\t'Group'\n" +
+ "'Jeff Beck'\t'1944-06-24'\t'United Kingdom'\tnull\t'Male'\t" +
+ "['Rock','Pop','Classic']\t'Person'\n" +
+ "'Los Paranoias'\tnull\t'Unknown'\tnull\tnull\t['Unknown']\tnull\n" +
+ "'…And You Will Know Us by the Trail of Dead'\t'1994-01-01'\t'USA'\tnull\tnull\t" +
+ "['Rock','Pop','Classic']\t'Group'\n");
}
@Test
@@ -203,9 +173,10 @@ public class CassandraInterpreterTest {
assertThat(actual).isNotNull();
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
assertThat(actual.message().get(0).getData())
- .isEqualTo("name\tborn\tcountry\tdied\tgender\tstyles\ttype\n" +
- "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
- "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n");
+ .isEqualTo("name\tborn\tcountry\tdied\tgender\tstyles\ttype\n" +
+ "'Bogdan Raczynski'\t'1977-01-01'\t'Poland'\tnull\t'Male'\t" +
+ "['Dance','Electro']\t'Person'\n" +
+ "'Krishna Das'\t'1947-05-31'\t'USA'\tnull\t'Male'\t['Unknown']\t'Person'\n");
}
@Test
@@ -231,9 +202,9 @@ public class CassandraInterpreterTest {
//Then
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
assertThat(actual.message().get(0).getData()).isEqualTo("title\tartist\tyear\n" +
- "The Impossible Dream EP\tCarter the Unstoppable Sex Machine\t1992\n" +
- "The Way You Are\tTears for Fears\t1983\n" +
- "Primitive\tSoulfly\t2003\n");
+ "'The Impossible Dream EP'\t'Carter the Unstoppable Sex Machine'\t1992\n" +
+ "'The Way You Are'\t'Tears for Fears'\t1983\n" +
+ "'Primitive'\t'Soulfly'\t2003\n");
}
@Test
@@ -262,8 +233,9 @@ public class CassandraInterpreterTest {
//Then
assertThat(actual.code()).isEqualTo(Code.ERROR);
+ String s = "line 1:9 mismatched input 'zeppelin' expecting K_FROM (SELECT * [zeppelin]...)";
assertThat(actual.message().get(0).getData())
- .contains("line 1:9 missing K_FROM at 'zeppelin' (SELECT * [zeppelin]....)");
+ .contains(s);
}
@Test
@@ -302,15 +274,22 @@ public class CassandraInterpreterTest {
String statement2 = "@timestamp=15\n" +
"INSERT INTO zeppelin.ts(key,val) VALUES('k','v2');";
+ CqlSession session = EmbeddedCassandraServerHelper.getSession();
// Insert v1 with current timestamp
interpreter.interpret(statement1, intrContext);
+ System.out.println("going to read data from zeppelin.ts;");
+ session.execute("SELECT val FROM zeppelin.ts LIMIT 1")
+ .forEach(x -> System.out.println("row " + x ));
Thread.sleep(1);
//When
// Insert v2 with past timestamp
interpreter.interpret(statement2, intrContext);
- final String actual = session.execute("SELECT * FROM zeppelin.ts LIMIT 1").one()
+ System.out.println("going to read data from zeppelin.ts;");
+ session.execute("SELECT val FROM zeppelin.ts LIMIT 1")
+ .forEach(x -> System.out.println("row " + x ));
+ final String actual = session.execute("SELECT val FROM zeppelin.ts LIMIT 1").one()
.getString("val");
//Then
@@ -318,20 +297,6 @@ public class CassandraInterpreterTest {
}
@Test
- public void should_execute_statement_with_retry_policy() throws Exception {
- //Given
- String statement = "@retryPolicy=" + interpreter.LOGGING_DOWNGRADING_RETRY + "\n" +
- "@consistency=THREE\n" +
- "SELECT * FROM zeppelin.artists LIMIT 1;";
-
- //When
- final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- }
-
- @Test
public void should_execute_statement_with_request_timeout() throws Exception {
//Given
String statement = "@requestTimeOut=10000000\n" +
@@ -358,7 +323,7 @@ public class CassandraInterpreterTest {
//Then
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
assertThat(actual.message().get(0).getData()).isEqualTo("key\tval\n" +
- "myKey\tmyValue\n");
+ "'myKey'\t'myValue'\n");
}
@Test
@@ -380,14 +345,14 @@ public class CassandraInterpreterTest {
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
assertThat(actual.message().get(0).getData()).isEqualTo(
"login\taddresses\tage\tdeceased\tfirstname\tlast_update\tlastname\tlocation\n" +
- "jdoe\t" +
+ "'jdoe'\t" +
"{street_number:3,street_name:'Beverly Hills Bld',zip_code:90209," +
"country:'USA',extra_info:['Right on the hills','Next to the post box']," +
- "phone_numbers:{'office':2015790847,'home':2016778524}}\tnull\t" +
+ "phone_numbers:{'home':2016778524,'office':2015790847}}\tnull\t" +
"null\t" +
- "John\t" +
+ "'John'\t" +
"null\t" +
- "DOE\t" +
+ "'DOE'\t" +
"('USA',90209,'Beverly Hills')\n");
}
@@ -424,7 +389,7 @@ public class CassandraInterpreterTest {
//Then
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\tage\n" +
- "Helen\tSUE\t27\n");
+ "'Helen'\t'SUE'\t27\n");
}
@Test
@@ -456,9 +421,9 @@ public class CassandraInterpreterTest {
//Then
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
assertThat(actual.message().get(0).getData()).isEqualTo("name\tcountry\tstyles\n" +
- "Bogdan Raczynski\tPoland\t[Dance, Electro]\n" +
- "Krishna Das\tUSA\t[Unknown]\n" +
- "Sheryl Crow\tUSA\t[Classic, Rock, Country, Blues, Pop, Folk]\n");
+ "'Bogdan Raczynski'\t'Poland'\t['Dance','Electro']\n" +
+ "'Krishna Das'\t'USA'\t['Unknown']\n" +
+ "'Sheryl Crow'\t'USA'\t['Classic','Rock','Country','Blues','Pop','Folk']\n");
}
@Test
@@ -473,8 +438,9 @@ public class CassandraInterpreterTest {
//Then
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ Instant tm = Instant.parse("2015-07-30T12:00:01Z");
assertThat(actual.message().get(0).getData()).contains("last_update\n" +
- "Thu Jul 30 12:00:01");
+ new TimestampCodec().format(tm));
}
@Test
@@ -490,7 +456,7 @@ public class CassandraInterpreterTest {
//Then
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\n" +
- "null\tNULL\n");
+ "null\t'NULL'\n");
}
@Test
@@ -506,7 +472,7 @@ public class CassandraInterpreterTest {
//Then
assertThat(actual.code()).isEqualTo(Code.SUCCESS);
assertThat(actual.message().get(0).getData()).isEqualTo("login\tdeceased\n" +
- "bind_bool\tfalse\n");
+ "'bind_bool'\tfalse\n");
}
@Test
@@ -537,11 +503,8 @@ public class CassandraInterpreterTest {
//When
final InterpreterResult actual = interpreter.interpret(query, intrContext);
- final Cluster cluster = session.getCluster();
- final int port = cluster.getConfiguration().getProtocolOptions().getPort();
- final String address = cluster.getMetadata().getAllHosts().iterator().next()
- .getAddress().getHostAddress()
- .replaceAll("/", "").replaceAll("\\[", "").replaceAll("\\]", "");
+ final int port = EmbeddedCassandraServerHelper.getNativeTransportPort();
+ final String address = EmbeddedCassandraServerHelper.getHost();
//Then
final String expected = rawResult.replaceAll("TRIED_HOSTS", address + ":" + port)
.replaceAll("QUERIED_HOSTS", address + ":" + port);
@@ -560,7 +523,8 @@ public class CassandraInterpreterTest {
//Then
assertThat(actual.code()).isEqualTo(Code.ERROR);
- assertThat(actual.message().get(0).getData()).contains("All host(s) tried for query failed");
+ assertThat(actual.message().get(0).getData())
+ .contains("All 1 node(s) tried for the query failed");
}
@Test
@@ -725,6 +689,37 @@ public class CassandraInterpreterTest {
}
@Test
+ public void should_describe_all_tables() throws Exception {
+ //Given
+ String query = "DESCRIBE TABLES;";
+ final String expected = reformatHtml(readTestResource(
+ "/scalate/DescribeTables.html"));
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+ }
+
+ @Test
+ public void should_describe_all_udts() throws Exception {
+ //Given
+ String query = "DESCRIBE TYPES;";
+ final String expected = reformatHtml(readTestResource(
+ "/scalate/DescribeTypes.html"));
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+ }
+
+
+ @Test
public void should_error_describing_non_existing_table() throws Exception {
//Given
String query = "USE system;\n" +
diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
index 9d4c9ee..0ae802e 100644
--- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
+++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
@@ -16,6 +16,12 @@
*/
package org.apache.zeppelin.cassandra;
+import static com.datastax.oss.driver.api.core.ConsistencyLevel.ALL;
+import static com.datastax.oss.driver.api.core.ConsistencyLevel.LOCAL_SERIAL;
+import static com.datastax.oss.driver.api.core.ConsistencyLevel.ONE;
+import static com.datastax.oss.driver.api.core.ConsistencyLevel.QUORUM;
+import static com.datastax.oss.driver.api.core.ConsistencyLevel.SERIAL;
+import static com.datastax.oss.driver.api.core.cql.BatchType.UNLOGGED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -26,12 +32,11 @@ import static org.mockito.Mockito.when;
import static java.util.Arrays.asList;
-import static com.datastax.driver.core.BatchStatement.Type.UNLOGGED;
-import static com.datastax.driver.core.ConsistencyLevel.ALL;
-import static com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL;
-import static com.datastax.driver.core.ConsistencyLevel.ONE;
-import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
-import static com.datastax.driver.core.ConsistencyLevel.SERIAL;
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.BatchStatement;
+import com.datastax.oss.driver.api.core.cql.BatchableStatement;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import org.junit.Rule;
import org.junit.Test;
@@ -43,27 +48,20 @@ import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
import java.util.List;
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.Statement;
-
import scala.Option;
import org.apache.zeppelin.cassandra.TextBlockHierarchy.AnyBlock;
import org.apache.zeppelin.cassandra.TextBlockHierarchy.Consistency;
-import org.apache.zeppelin.cassandra.TextBlockHierarchy.DowngradingRetryPolicy$;
-import org.apache.zeppelin.cassandra.TextBlockHierarchy.LoggingDefaultRetryPolicy$;
import org.apache.zeppelin.cassandra.TextBlockHierarchy.QueryParameters;
import org.apache.zeppelin.cassandra.TextBlockHierarchy.RequestTimeOut;
-import org.apache.zeppelin.cassandra.TextBlockHierarchy.RetryPolicy;
import org.apache.zeppelin.cassandra.TextBlockHierarchy.SerialConsistency;
import org.apache.zeppelin.cassandra.TextBlockHierarchy.SimpleStm;
import org.apache.zeppelin.cassandra.TextBlockHierarchy.Timestamp;
@@ -82,7 +80,7 @@ public class InterpreterLogicTest {
private InterpreterContext intrContext;
@Mock
- private Session session;
+ private CqlSession session;
final InterpreterLogic helper = new InterpreterLogic(session);
@@ -268,19 +266,6 @@ public class InterpreterLogicTest {
}
@Test
- public void should_extract_retry_policy_option() throws Exception {
- //Given
- List<QueryParameters> options = Arrays.<QueryParameters>asList(DowngradingRetryPolicy$.MODULE$,
- LoggingDefaultRetryPolicy$.MODULE$);
-
- //When
- final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
- //Then
- assertThat(actual.retryPolicy().get()).isSameAs(DowngradingRetryPolicy$.MODULE$);
- }
-
- @Test
public void should_extract_request_timeout_option() throws Exception {
//Given
List<QueryParameters> options = Arrays.<QueryParameters>asList(new RequestTimeOut(100));
@@ -299,7 +284,6 @@ public class InterpreterLogicTest {
CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
Option.<ConsistencyLevel>empty(),
Option.empty(),
- Option.<RetryPolicy>empty(),
Option.empty(),
Option.empty());
@@ -309,20 +293,20 @@ public class InterpreterLogicTest {
//Then
assertThat(actual).isNotNull();
- assertThat(actual.getQueryString()).isEqualTo("SELECT * FROM users LIMIT 10;");
+ assertThat(actual.getQuery()).isEqualTo("SELECT * FROM users LIMIT 10;");
assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM);
}
@Test
public void should_generate_batch_statement() throws Exception {
//Given
- Statement st1 = new SimpleStatement("SELECT * FROM users LIMIT 10;");
- Statement st2 = new SimpleStatement("INSERT INTO users(id) VALUES(10);");
- Statement st3 = new SimpleStatement("UPDATE users SET name = 'John DOE' WHERE id=10;");
+ SimpleStatement st1 = SimpleStatement.newInstance("SELECT * FROM users LIMIT 10;");
+ SimpleStatement st2 = SimpleStatement.newInstance("INSERT INTO users(id) VALUES(10);");
+ SimpleStatement st3 = SimpleStatement.newInstance(
+ "UPDATE users SET name = 'John DOE' WHERE id=10;");
CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
Option.<ConsistencyLevel>empty(),
Option.empty(),
- Option.<RetryPolicy>empty(),
Option.empty(),
Option.empty());
@@ -332,7 +316,10 @@ public class InterpreterLogicTest {
//Then
assertThat(actual).isNotNull();
- final List<Statement> statements = new ArrayList<>(actual.getStatements());
+ List<BatchableStatement> statements = new ArrayList<BatchableStatement>();
+ for (BatchableStatement b: actual) {
+ statements.add(b);
+ }
assertThat(statements).hasSize(3);
assertThat(statements.get(0)).isSameAs(st1);
assertThat(statements.get(1)).isSameAs(st2);
@@ -359,18 +346,17 @@ public class InterpreterLogicTest {
String dateString = "2015-07-30 12:00:01";
//When
- final Date actual = helper.parseDate(dateString);
+ final Instant actual = helper.parseDate(dateString);
//Then
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(actual);
-
- assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
- assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
- assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
- assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
- assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
- assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
+ ZonedDateTime dt = actual.atZone(ZoneOffset.UTC);
+
+ assertThat(dt.getLong(ChronoField.YEAR_OF_ERA)).isEqualTo(2015);
+ assertThat(dt.getLong(ChronoField.MONTH_OF_YEAR)).isEqualTo(7);
+ assertThat(dt.getLong(ChronoField.DAY_OF_MONTH)).isEqualTo(30);
+ assertThat(dt.getLong(ChronoField.HOUR_OF_DAY)).isEqualTo(12);
+ assertThat(dt.getLong(ChronoField.MINUTE_OF_HOUR)).isEqualTo(0);
+ assertThat(dt.getLong(ChronoField.SECOND_OF_MINUTE)).isEqualTo(1);
}
@Test
@@ -379,19 +365,18 @@ public class InterpreterLogicTest {
String dateString = "2015-07-30 12:00:01.123";
//When
- final Date actual = helper.parseDate(dateString);
+ final Instant actual = helper.parseDate(dateString);
//Then
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(actual);
-
- assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
- assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
- assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
- assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
- assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
- assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
- assertThat(calendar.get(Calendar.MILLISECOND)).isEqualTo(123);
+ ZonedDateTime dt = actual.atZone(ZoneOffset.UTC);
+
+ assertThat(dt.getLong(ChronoField.YEAR_OF_ERA)).isEqualTo(2015);
+ assertThat(dt.getLong(ChronoField.MONTH_OF_YEAR)).isEqualTo(7);
+ assertThat(dt.getLong(ChronoField.DAY_OF_MONTH)).isEqualTo(30);
+ assertThat(dt.getLong(ChronoField.HOUR_OF_DAY)).isEqualTo(12);
+ assertThat(dt.getLong(ChronoField.MINUTE_OF_HOUR)).isEqualTo(0);
+ assertThat(dt.getLong(ChronoField.SECOND_OF_MINUTE)).isEqualTo(1);
+ assertThat(dt.getLong(ChronoField.MILLI_OF_SECOND)).isEqualTo(123);
}
private <A> scala.collection.immutable.List<A> toScalaList(java.util.List<A> list) {
diff --git a/cassandra/src/test/resources/application.conf b/cassandra/src/test/resources/application.conf
new file mode 100644
index 0000000..961261a
--- /dev/null
+++ b/cassandra/src/test/resources/application.conf
@@ -0,0 +1,3 @@
+datastax-java-driver {
+ advanced.request.warn-if-set-keyspace = false
+}
diff --git a/cassandra/src/test/resources/prepare_schema.cql b/cassandra/src/test/resources/prepare_all.cql
similarity index 63%
rename from cassandra/src/test/resources/prepare_schema.cql
rename to cassandra/src/test/resources/prepare_all.cql
index bdf9773..ec40293 100644
--- a/cassandra/src/test/resources/prepare_schema.cql
+++ b/cassandra/src/test/resources/prepare_all.cql
@@ -111,4 +111,23 @@ CREATE TABLE IF NOT EXISTS live_data.stations (
station_id uuid,
sensors frozen<map<uuid,geolocation>>,
PRIMARY KEY (station_id)
-);
\ No newline at end of file
+);
+
+-- insert data
+TRUNCATE zeppelin.artists;
+
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Bogdan Raczynski','1977-01-01','Poland',null,'Male',['Dance', 'Electro'],'Person');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Krishna Das','1947-05-31','USA',null,'Male',['Unknown'],'Person');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Sheryl Crow','1962-02-11','USA',null,'Female',['Classic', 'Rock', 'Country', 'Blues', 'Pop', 'Folk'],'Person');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Doof','1968-08-31','United Kingdom',null,null,['Unknown'],'Person');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('House of Large Sizes','1986-01-01','USA','2003',null,['Unknown'],'Group');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Fanfarlo','2006-01-01','United Kingdom',null,null,['Rock', 'Indie', 'Pop', 'Classic'],'Group');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Jeff Beck','1944-06-24','United Kingdom',null,'Male',['Rock', 'Pop', 'Classic'],'Person');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Los Paranoias',null,'Unknown',null,null,['Unknown'],null);
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('…And You Will Know Us by the Trail of Dead','1994-01-01','USA',null,null,['Rock', 'Pop', 'Classic'],'Group');
+
+TRUNCATE zeppelin.ts;
+
+TRUNCATE zeppelin.prepared;
+
+TRUNCATE zeppelin.users;
diff --git a/cassandra/src/test/resources/prepare_data.cql b/cassandra/src/test/resources/prepare_data.cql
deleted file mode 100644
index 590a97c..0000000
--- a/cassandra/src/test/resources/prepare_data.cql
+++ /dev/null
@@ -1,18 +0,0 @@
-
-TRUNCATE zeppelin.artists;
-
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Bogdan Raczynski','1977-01-01','Poland',null,'Male',['Dance', 'Electro'],'Person');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Krishna Das','1947-05-31','USA',null,'Male',['Unknown'],'Person');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Sheryl Crow','1962-02-11','USA',null,'Female',['Classic', 'Rock', 'Country', 'Blues', 'Pop', 'Folk'],'Person');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Doof','1968-08-31','United Kingdom',null,null,['Unknown'],'Person');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('House of Large Sizes','1986-01-01','USA','2003',null,['Unknown'],'Group');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Fanfarlo','2006-01-01','United Kingdom',null,null,['Rock', 'Indie', 'Pop', 'Classic'],'Group');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Jeff Beck','1944-06-24','United Kingdom',null,'Male',['Rock', 'Pop', 'Classic'],'Person');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Los Paranoias',null,'Unknown',null,null,['Unknown'],null);
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('…And You Will Know Us by the Trail of Dead','1994-01-01','USA',null,null,['Rock', 'Pop', 'Classic'],'Group');
-
-TRUNCATE zeppelin.ts;
-
-TRUNCATE zeppelin.prepared;
-
-TRUNCATE zeppelin.users;
diff --git a/cassandra/src/test/resources/scalate/DescribeKeyspace_live_data.html b/cassandra/src/test/resources/scalate/DescribeKeyspace_live_data.html
index 344780a..22086f5 100644
--- a/cassandra/src/test/resources/scalate/DescribeKeyspace_live_data.html
+++ b/cassandra/src/test/resources/scalate/DescribeKeyspace_live_data.html
@@ -1 +1 @@
-<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> <strong>live_data</strong></span><span class="text-danger caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-primary">Tables</span></li><li><a role="button [...]
\ No newline at end of file
+<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> <strong>live_data</strong></span><span class="text-danger caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-primary">Tables</span></li><li><a role="button [...]
diff --git a/cassandra/src/test/resources/scalate/DescribeKeyspaces.html b/cassandra/src/test/resources/scalate/DescribeKeyspaces.html
index b5c364e..76a8ed9 100644
--- a/cassandra/src/test/resources/scalate/DescribeKeyspaces.html
+++ b/cassandra/src/test/resources/scalate/DescribeKeyspaces.html
@@ -1 +1 @@
-<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-muted"><i class="glyphicon glyphicon-dashboard"/> <strong>Test Cluster</strong></span><span class="text-muted caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-danger">Keyspaces</span></li><li><a role="button" dat [...]
\ No newline at end of file
+<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-muted"><i class="glyphicon glyphicon-dashboard"/> <strong>Test Cluster</strong></span><span class="text-muted caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-danger">Keyspaces</span></li><li><a role="button" dat [...]
\ No newline at end of file
diff --git a/cassandra/src/test/resources/scalate/DescribeTable_live_data_complex_table.html b/cassandra/src/test/resources/scalate/DescribeTable_live_data_complex_table.html
index fd3ee67..e6be8b4 100644
--- a/cassandra/src/test/resources/scalate/DescribeTable_live_data_complex_table.html
+++ b/cassandra/src/test/resources/scalate/DescribeTable_live_data_complex_table.html
@@ -1 +1 @@
-<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li><a><strong>DESCRIBE TABLE live_data.complex_table;</strong></a></li></ul><ul class="nav navbar-nav navbar-right"><li class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><strong>Legend</strong><span class="caret"></span></a><ul class="dropdown-menu"><li><a role="button"><i class="glyphicon glyphicon-dashboard text-muted" /> Cluster</a></l [...]
\ No newline at end of file
+<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li><a><strong>DESCRIBE TABLE live_data.complex_table;</strong></a></li></ul><ul class="nav navbar-nav navbar-right"><li class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><strong>Legend</strong><span class="caret"></span></a><ul class="dropdown-menu"><li><a role="button"><i class="glyphicon glyphicon-dashboard text-muted" /> Cluster</a></l [...]
diff --git a/cassandra/src/test/resources/scalate/DescribeTables.html b/cassandra/src/test/resources/scalate/DescribeTables.html
index cf69403..b6bfcba 100644
--- a/cassandra/src/test/resources/scalate/DescribeTables.html
+++ b/cassandra/src/test/resources/scalate/DescribeTables.html
@@ -1,313 +1,477 @@
<br/>
<br/>
<nav class="navbar navbar-default">
- <ul class="nav navbar-nav">
-
- <li role="presentation" class="dropdown">
- <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
- <span class="text-muted"><i class="glyphicon glyphicon-dashboard"/> <strong>Test Cluster</strong></span>
- <span class="text-muted caret"></span>
- <ul class="dropdown-menu">
- <li class="dropdown-header"><span class="text-danger">Keyspaces</span></li>
- <li>
- <a role="button" data-toggle="collapse" data-target="#d0b77780-3518-11e5-86fc-8f0ea8ae1a37">
- <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> live_data</span>
- </a>
- </li>
- <li>
- <a role="button" data-toggle="collapse" data-target="#d0b7ecb0-3518-11e5-86fc-8f0ea8ae1a37">
- <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> samples</span>
- </a>
- </li>
- <li>
- <a role="button" data-toggle="collapse" data-target="#d0b7ecb1-3518-11e5-86fc-8f0ea8ae1a37">
- <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> system</span>
- </a>
- </li>
- <li>
- <a role="button" data-toggle="collapse" data-target="#d0b813c0-3518-11e5-86fc-8f0ea8ae1a37">
- <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> system_traces</span>
- </a>
- </li>
- <li>
- <a role="button" data-toggle="collapse" data-target="#d0b813c1-3518-11e5-86fc-8f0ea8ae1a37">
- <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> zeppelin</span>
- </a>
- </li>
- </ul>
+ <ul class="nav navbar-nav">
+ <li role="presentation" class="dropdown">
+ <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
+ <span class="text-muted">
+ <i class="glyphicon glyphicon-dashboard"/> <strong>Test Cluster</strong>
+ </span>
+ <span class="text-muted caret">
+ </span>
+ <ul class="dropdown-menu">
+ <li class="dropdown-header">
+ <span class="text-danger">Keyspaces</span>
+ </li>
+ <li>
+ <a role="button" data-toggle="collapse" >
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> live_data</span>
</a>
+ </li>
+ <li>
+ <a role="button" data-toggle="collapse" >
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> system</span>
+ </a>
+ </li>
+ <li>
+ <a role="button" data-toggle="collapse" >
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> system_auth</span>
+ </a>
+ </li>
+ <li>
+ <a role="button" data-toggle="collapse" >
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> system_distributed</span>
+ </a>
+ </li>
+ <li>
+ <a role="button" data-toggle="collapse" >
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> system_schema</span>
+ </a>
+ </li>
+ <li>
+ <a role="button" data-toggle="collapse" >
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> system_traces</span>
+ </a>
+ </li>
+ <li>
+ <a role="button" data-toggle="collapse" >
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> zeppelin</span>
+ </a>
+ </li>
+ </ul>
+ </a>
+ </li>
+ <li>
+ <a>
+ <strong>DESCRIBE TABLES;</strong>
+ </a>
+ </li>
+ </ul>
+ <ul class="nav navbar-nav navbar-right">
+ <li class="dropdown">
+ <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
+ <strong>Legend</strong>
+ <span class="caret">
+ </span>
+ </a>
+ <ul class="dropdown-menu">
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-dashboard text-muted" /> Cluster</a>
</li>
-
<li>
- <a><strong>DESCRIBE TABLES;</strong></a>
+ <a role="button">
+ <i class="glyphicon glyphicon-folder-open text-danger" /> Keyspace</a>
</li>
- </ul>
- <ul class="nav navbar-nav navbar-right">
- <li class="dropdown">
- <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
- <strong>Legend</strong>
- <span class="caret"></span>
- </a>
- <ul class="dropdown-menu">
- <li>
- <a role="button">
- <i class="glyphicon glyphicon-dashboard text-muted" /> Cluster
- </a>
- </li>
- <li>
- <a role="button">
- <i class="glyphicon glyphicon-folder-open text-danger" /> Keyspace
- </a>
- </li>
- <li>
- <a role="button">
- <i class="glyphicon glyphicon-copyright-mark text-warning" /> UDT
- </a>
- </li>
- <li>
- <a role="button">
- <i class="glyphicon glyphicon-th-list text-primary" /> Table
- </a>
- </li>
- <li class="bg-info">
- <a role="button">
- <i class="glyphicon glyphicon-fullscreen" /> Partition Key
- </a>
- </li>
- <li class="bg-warning">
- <a role="button">
- <i class="glyphicon glyphicon-pushpin" /> Static Column
- </a>
- </li>
- <li class="bg-success">
- <a role="button">
- <i class="glyphicon glyphicon-sort" /> Clustering Column
- </a>
- </li>
- <li class="bg-success">
- <a role="button">
- <i class="glyphicon glyphicon-sort-by-attributes" /> Clustering Order ASC
- </a>
- </li>
- <li class="bg-success">
- <a role="button">
- <i class="glyphicon glyphicon-sort-by-attributes-alt" /> Clustering Order DESC
- </a>
- </li>
- <li>
- <a role="button">
- <i class="glyphicon glyphicon-info-sign" /> Indexed Column
- </a>
- </li>
- </ul>
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-copyright-mark text-warning" /> UDT</a>
+ </li>
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-th-list text-primary" /> Table</a>
</li>
<li>
- <a href="#"></a>
+ <a role="button">
+ <i class="glyphicon glyphicon-eye-open text-primary" /> Materialized View</a>
+ </li>
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-random text-success" /> Function</a>
+ </li>
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-retweet text-success" /> Aggregate</a>
+ </li>
+ <li role="separator" class="divider text-muted">
+ </li>
+ <li class="dropdown-header">
+ <span class="text-primary">Table icons</span>
+ </li>
+ <li class="bg-info">
+ <a role="button">
+ <i class="glyphicon glyphicon-fullscreen" /> Partition Key</a>
+ </li>
+ <li class="bg-warning">
+ <a role="button">
+ <i class="glyphicon glyphicon-pushpin" /> Static Column</a>
+ </li>
+ <li class="bg-success">
+ <a role="button">
+ <i class="glyphicon glyphicon-sort" /> Clustering Column</a>
+ </li>
+ <li class="bg-success">
+ <a role="button">
+ <i class="glyphicon glyphicon-sort-by-attributes" /> Clustering Order ASC</a>
+ </li>
+ <li class="bg-success">
+ <a role="button">
+ <i class="glyphicon glyphicon-sort-by-attributes-alt" /> Clustering Order DESC</a>
</li>
- </ul>
+ </ul>
+ </li>
+ <li>
+ <a href="#">
+ </a>
+ </li>
+ </ul>
</nav>
<hr/>
-
<div class="container">
-
- <div class="row">
- <div class="panel-group" role="tablist" aria-multiselectable="true">
-
- <div class="panel panel-default">
- <div class="panel-heading" role="tab">
- <h4 class="panel-title">
- <a role="button" data-toggle="collapse" data-target="#d0b77780-3518-11e5-86fc-8f0ea8ae1a37" aria-expanded="false">
- <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> live_data</span>
- </a>
- </h4>
- </div>
- <div id="d0b77780-3518-11e5-86fc-8f0ea8ae1a37" class="panel-collapse collapse" role="tabpanel">
- <div class="panel-body">
- <div class="row">
- <div class="col-md-2"/>
- <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
-
-
- <table class="table">
- <thead>
- <tr><th>Tables</th></tr>
- </thead>
- <tbody>
-
- <tr class="text-primary"><td>complex_table</td></tr>
-
- <tr class="text-primary"><td>sensor_data</td></tr>
-
- <tr class="text-primary"><td>stations</td></tr>
-
- </tbody>
- </table>
- </div>
- <div class="col-md-2"/>
- </div>
- </div>
- </div>
+ <div class="row">
+ <div class="panel-group" role="tablist" aria-multiselectable="true">
+ <div class="panel panel-default">
+ <div class="panel-heading" role="tab">
+ <h4 class="panel-title">
+ <a role="button" data-toggle="collapse" aria-expanded="false">
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> live_data</span>
+ </a>
+ </h4>
+ </div>
+ <div class="panel-collapse collapse" role="tabpanel">
+ <div class="panel-body">
+ <div class="row">
+ <div class="col-md-2"/>
+ <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+ <table class="table">
+ <thead>
+ <tr>
+ <th>Tables</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr class="text-primary">
+ <td>complex_table</td>
+ </tr>
+ <tr class="text-primary">
+ <td>sensor_data</td>
+ </tr>
+ <tr class="text-primary">
+ <td>stations</td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ <div class="col-md-2"/>
+ </div>
+ </div>
+ </div>
+ </div>
+ <div class="panel panel-default">
+ <div class="panel-heading" role="tab">
+ <h4 class="panel-title">
+ <a role="button" data-toggle="collapse" aria-expanded="false">
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> system</span>
+ </a>
+ </h4>
+ </div>
+ <div class="panel-collapse collapse" role="tabpanel">
+ <div class="panel-body">
+ <div class="row">
+ <div class="col-md-2"/>
+ <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+ <table class="table">
+ <thead>
+ <tr>
+ <th>Tables</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr class="text-primary">
+ <td>"IndexInfo"</td>
+ </tr>
+ <tr class="text-primary">
+ <td>available_ranges</td>
+ </tr>
+ <tr class="text-primary">
+ <td>batches</td>
+ </tr>
+ <tr class="text-primary">
+ <td>batchlog</td>
+ </tr>
+ <tr class="text-primary">
+ <td>built_views</td>
+ </tr>
+ <tr class="text-primary">
+ <td>compaction_history</td>
+ </tr>
+ <tr class="text-primary">
+ <td>hints</td>
+ </tr>
+ <tr class="text-primary">
+ <td>local</td>
+ </tr>
+ <tr class="text-primary">
+ <td>paxos</td>
+ </tr>
+ <tr class="text-primary">
+ <td>peer_events</td>
+ </tr>
+ <tr class="text-primary">
+ <td>peers</td>
+ </tr>
+ <tr class="text-primary">
+ <td>prepared_statements</td>
+ </tr>
+ <tr class="text-primary">
+ <td>range_xfers</td>
+ </tr>
+ <tr class="text-primary">
+ <td>size_estimates</td>
+ </tr>
+ <tr class="text-primary">
+ <td>sstable_activity</td>
+ </tr>
+ <tr class="text-primary">
+ <td>transferred_ranges</td>
+ </tr>
+ <tr class="text-primary">
+ <td>views_builds_in_progress</td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ <div class="col-md-2"/>
</div>
-
- <div class="panel panel-default">
- <div class="panel-heading" role="tab">
- <h4 class="panel-title">
- <a role="button" data-toggle="collapse" data-target="#d0b7ecb0-3518-11e5-86fc-8f0ea8ae1a37" aria-expanded="false">
- <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> samples</span>
- </a>
- </h4>
- </div>
- <div id="d0b7ecb0-3518-11e5-86fc-8f0ea8ae1a37" class="panel-collapse collapse" role="tabpanel">
- <div class="panel-body">
- <div class="row">
- <div class="col-md-2"/>
- <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
-
- <span><h4>No Table</h4></span>
-
- </div>
- <div class="col-md-2"/>
- </div>
- </div>
- </div>
+ </div>
+ </div>
+ </div>
+ <div class="panel panel-default">
+ <div class="panel-heading" role="tab">
+ <h4 class="panel-title">
+ <a role="button" data-toggle="collapse" aria-expanded="false">
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> system_auth</span>
+ </a>
+ </h4>
+ </div>
+ <div class="panel-collapse collapse" role="tabpanel">
+ <div class="panel-body">
+ <div class="row">
+ <div class="col-md-2"/>
+ <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+ <table class="table">
+ <thead>
+ <tr>
+ <th>Tables</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr class="text-primary">
+ <td>resource_role_permissons_index</td>
+ </tr>
+ <tr class="text-primary">
+ <td>role_members</td>
+ </tr>
+ <tr class="text-primary">
+ <td>role_permissions</td>
+ </tr>
+ <tr class="text-primary">
+ <td>roles</td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ <div class="col-md-2"/>
+ </div>
+ </div>
+ </div>
+ </div>
+ <div class="panel panel-default">
+ <div class="panel-heading" role="tab">
+ <h4 class="panel-title">
+ <a role="button" data-toggle="collapse" aria-expanded="false">
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> system_distributed</span>
+ </a>
+ </h4>
+ </div>
+ <div class="panel-collapse collapse" role="tabpanel">
+ <div class="panel-body">
+ <div class="row">
+ <div class="col-md-2"/>
+ <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+ <table class="table">
+ <thead>
+ <tr>
+ <th>Tables</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr class="text-primary">
+ <td>parent_repair_history</td>
+ </tr>
+ <tr class="text-primary">
+ <td>repair_history</td>
+ </tr>
+ <tr class="text-primary">
+ <td>view_build_status</td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ <div class="col-md-2"/>
</div>
-
- <div class="panel panel-default">
- <div class="panel-heading" role="tab">
- <h4 class="panel-title">
- <a role="button" data-toggle="collapse" data-target="#d0b7ecb1-3518-11e5-86fc-8f0ea8ae1a37" aria-expanded="false">
- <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> system</span>
- </a>
- </h4>
- </div>
- <div id="d0b7ecb1-3518-11e5-86fc-8f0ea8ae1a37" class="panel-collapse collapse" role="tabpanel">
- <div class="panel-body">
- <div class="row">
- <div class="col-md-2"/>
- <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
-
-
- <table class="table">
- <thead>
- <tr><th>Tables</th></tr>
- </thead>
- <tbody>
-
- <tr class="text-primary"><td>IndexInfo</td></tr>
-
- <tr class="text-primary"><td>batchlog</td></tr>
-
- <tr class="text-primary"><td>compaction_history</td></tr>
-
- <tr class="text-primary"><td>compactions_in_progress</td></tr>
-
- <tr class="text-primary"><td>hints</td></tr>
-
- <tr class="text-primary"><td>local</td></tr>
-
- <tr class="text-primary"><td>paxos</td></tr>
-
- <tr class="text-primary"><td>peer_events</td></tr>
-
- <tr class="text-primary"><td>peers</td></tr>
-
- <tr class="text-primary"><td>range_xfers</td></tr>
-
- <tr class="text-primary"><td>schema_columnfamilies</td></tr>
-
- <tr class="text-primary"><td>schema_columns</td></tr>
-
- <tr class="text-primary"><td>schema_keyspaces</td></tr>
-
- <tr class="text-primary"><td>schema_triggers</td></tr>
-
- <tr class="text-primary"><td>schema_usertypes</td></tr>
-
- <tr class="text-primary"><td>size_estimates</td></tr>
-
- <tr class="text-primary"><td>sstable_activity</td></tr>
-
- </tbody>
- </table>
- </div>
- <div class="col-md-2"/>
- </div>
- </div>
- </div>
+ </div>
+ </div>
+ </div>
+ <div class="panel panel-default">
+ <div class="panel-heading" role="tab">
+ <h4 class="panel-title">
+ <a role="button" data-toggle="collapse" aria-expanded="false">
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> system_schema</span>
+ </a>
+ </h4>
+ </div>
+ <div class="panel-collapse collapse" role="tabpanel">
+ <div class="panel-body">
+ <div class="row">
+ <div class="col-md-2"/>
+ <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+ <table class="table">
+ <thead>
+ <tr>
+ <th>Tables</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr class="text-primary">
+ <td>aggregates</td>
+ </tr>
+ <tr class="text-primary">
+ <td>columns</td>
+ </tr>
+ <tr class="text-primary">
+ <td>dropped_columns</td>
+ </tr>
+ <tr class="text-primary">
+ <td>functions</td>
+ </tr>
+ <tr class="text-primary">
+ <td>indexes</td>
+ </tr>
+ <tr class="text-primary">
+ <td>keyspaces</td>
+ </tr>
+ <tr class="text-primary">
+ <td>tables</td>
+ </tr>
+ <tr class="text-primary">
+ <td>triggers</td>
+ </tr>
+ <tr class="text-primary">
+ <td>types</td>
+ </tr>
+ <tr class="text-primary">
+ <td>views</td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ <div class="col-md-2"/>
</div>
-
- <div class="panel panel-default">
- <div class="panel-heading" role="tab">
- <h4 class="panel-title">
- <a role="button" data-toggle="collapse" data-target="#d0b813c0-3518-11e5-86fc-8f0ea8ae1a37" aria-expanded="false">
- <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> system_traces</span>
- </a>
- </h4>
- </div>
- <div id="d0b813c0-3518-11e5-86fc-8f0ea8ae1a37" class="panel-collapse collapse" role="tabpanel">
- <div class="panel-body">
- <div class="row">
- <div class="col-md-2"/>
- <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
-
-
- <table class="table">
- <thead>
- <tr><th>Tables</th></tr>
- </thead>
- <tbody>
-
- <tr class="text-primary"><td>events</td></tr>
-
- <tr class="text-primary"><td>sessions</td></tr>
-
- </tbody>
- </table>
- </div>
- <div class="col-md-2"/>
- </div>
- </div>
- </div>
+ </div>
+ </div>
+ </div>
+ <div class="panel panel-default">
+ <div class="panel-heading" role="tab">
+ <h4 class="panel-title">
+ <a role="button" data-toggle="collapse" aria-expanded="false">
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> system_traces</span>
+ </a>
+ </h4>
+ </div>
+ <div class="panel-collapse collapse" role="tabpanel">
+ <div class="panel-body">
+ <div class="row">
+ <div class="col-md-2"/>
+ <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+ <table class="table">
+ <thead>
+ <tr>
+ <th>Tables</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr class="text-primary">
+ <td>events</td>
+ </tr>
+ <tr class="text-primary">
+ <td>sessions</td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ <div class="col-md-2"/>
</div>
-
- <div class="panel panel-default">
- <div class="panel-heading" role="tab">
- <h4 class="panel-title">
- <a role="button" data-toggle="collapse" data-target="#d0b813c1-3518-11e5-86fc-8f0ea8ae1a37" aria-expanded="false">
- <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/> zeppelin</span>
- </a>
- </h4>
- </div>
- <div id="d0b813c1-3518-11e5-86fc-8f0ea8ae1a37" class="panel-collapse collapse" role="tabpanel">
- <div class="panel-body">
- <div class="row">
- <div class="col-md-2"/>
- <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
-
-
- <table class="table">
- <thead>
- <tr><th>Tables</th></tr>
- </thead>
- <tbody>
-
- <tr class="text-primary"><td>artists</td></tr>
-
- <tr class="text-primary"><td>prepared</td></tr>
-
- <tr class="text-primary"><td>ts</td></tr>
-
- <tr class="text-primary"><td>users</td></tr>
-
- </tbody>
- </table>
- </div>
- <div class="col-md-2"/>
- </div>
- </div>
- </div>
+ </div>
+ </div>
+ </div>
+ <div class="panel panel-default">
+ <div class="panel-heading" role="tab">
+ <h4 class="panel-title">
+ <a role="button" data-toggle="collapse" aria-expanded="false">
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> zeppelin</span>
+ </a>
+ </h4>
+ </div>
+ <div class="panel-collapse collapse" role="tabpanel">
+ <div class="panel-body">
+ <div class="row">
+ <div class="col-md-2"/>
+ <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+ <table class="table">
+ <thead>
+ <tr>
+ <th>Tables</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr class="text-primary">
+ <td>artists</td>
+ </tr>
+ <tr class="text-primary">
+ <td>no_select</td>
+ </tr>
+ <tr class="text-primary">
+ <td>prepared</td>
+ </tr>
+ <tr class="text-primary">
+ <td>ts</td>
+ </tr>
+ <tr class="text-primary">
+ <td>users</td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ <div class="col-md-2"/>
</div>
-
+ </div>
</div>
+ </div>
</div>
-</div>
\ No newline at end of file
+ </div>
+</div>
diff --git a/cassandra/src/test/resources/scalate/DescribeTypes.html b/cassandra/src/test/resources/scalate/DescribeTypes.html
new file mode 100644
index 0000000..d13af85
--- /dev/null
+++ b/cassandra/src/test/resources/scalate/DescribeTypes.html
@@ -0,0 +1,179 @@
+<br/>
+<br/>
+<nav class="navbar navbar-default">
+ <ul class="nav navbar-nav">
+ <li role="presentation" class="dropdown">
+ <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
+ <span class="text-muted">
+ <i class="glyphicon glyphicon-dashboard"/> <strong>Test Cluster</strong>
+ </span>
+ <span class="text-muted caret">
+ </span>
+ <ul class="dropdown-menu">
+ <li class="dropdown-header">
+ <span class="text-danger">Keyspaces</span>
+ </li>
+ <li>
+ <a role="button" data-toggle="collapse" >
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> live_data</span>
+ </a>
+ </li>
+ <li>
+ <a role="button" data-toggle="collapse" >
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> zeppelin</span>
+ </a>
+ </li>
+ </ul>
+ </a>
+ </li>
+ <li>
+ <a>
+ <strong>DESCRIBE TYPES;</strong>
+ </a>
+ </li>
+ </ul>
+ <ul class="nav navbar-nav navbar-right">
+ <li class="dropdown">
+ <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
+ <strong>Legend</strong>
+ <span class="caret">
+ </span>
+ </a>
+ <ul class="dropdown-menu">
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-dashboard text-muted" /> Cluster</a>
+ </li>
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-folder-open text-danger" /> Keyspace</a>
+ </li>
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-copyright-mark text-warning" /> UDT</a>
+ </li>
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-th-list text-primary" /> Table</a>
+ </li>
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-eye-open text-primary" /> Materialized View</a>
+ </li>
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-random text-success" /> Function</a>
+ </li>
+ <li>
+ <a role="button">
+ <i class="glyphicon glyphicon-retweet text-success" /> Aggregate</a>
+ </li>
+ <li role="separator" class="divider text-muted">
+ </li>
+ <li class="dropdown-header">
+ <span class="text-primary">Table icons</span>
+ </li>
+ <li class="bg-info">
+ <a role="button">
+ <i class="glyphicon glyphicon-fullscreen" /> Partition Key</a>
+ </li>
+ <li class="bg-warning">
+ <a role="button">
+ <i class="glyphicon glyphicon-pushpin" /> Static Column</a>
+ </li>
+ <li class="bg-success">
+ <a role="button">
+ <i class="glyphicon glyphicon-sort" /> Clustering Column</a>
+ </li>
+ <li class="bg-success">
+ <a role="button">
+ <i class="glyphicon glyphicon-sort-by-attributes" /> Clustering Order ASC</a>
+ </li>
+ <li class="bg-success">
+ <a role="button">
+ <i class="glyphicon glyphicon-sort-by-attributes-alt" /> Clustering Order DESC</a>
+ </li>
+ </ul>
+ </li>
+ <li>
+ <a href="#">
+ </a>
+ </li>
+ </ul>
+</nav>
+<hr/>
+<div class="container">
+ <div class="row">
+ <div class="panel-group" role="tablist" aria-multiselectable="true">
+ <div class="panel panel-default">
+ <div class="panel-heading" role="tab">
+ <h4 class="panel-title">
+ <a role="button" data-toggle="collapse" aria-expanded="false">
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> live_data</span>
+ </a>
+ </h4>
+ </div>
+ <div class="panel-collapse collapse" role="tabpanel">
+ <div class="panel-body">
+ <div class="row">
+ <div class="col-md-2"/>
+ <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+ <table class="table">
+ <thead>
+ <tr>
+ <th>UDT</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr class="text-warning">
+ <td>address</td>
+ </tr>
+ <tr class="text-warning">
+ <td>geolocation</td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ <div class="col-md-2"/>
+ </div>
+ </div>
+ </div>
+ </div>
+ <div class="panel panel-default">
+ <div class="panel-heading" role="tab">
+ <h4 class="panel-title">
+ <a role="button" data-toggle="collapse" aria-expanded="false">
+ <span class="text-danger">
+ <i class="glyphicon glyphicon-folder-open"/> zeppelin</span>
+ </a>
+ </h4>
+ </div>
+ <div class="panel-collapse collapse" role="tabpanel">
+ <div class="panel-body">
+ <div class="row">
+ <div class="col-md-2"/>
+ <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+ <table class="table">
+ <thead>
+ <tr>
+ <th>UDT</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr class="text-warning">
+ <td>address</td>
+ </tr>
+ </tbody>
+ </table>
+ </div>
+ <div class="col-md-2"/>
+ </div>
+ </div>
+ </div>
+ </div>
+ </div>
+ </div>
+</div>
diff --git a/cassandra/src/test/resources/scalate/Help.html b/cassandra/src/test/resources/scalate/Help.html
index d784fe3..ce702ab 100644
--- a/cassandra/src/test/resources/scalate/Help.html
+++ b/cassandra/src/test/resources/scalate/Help.html
@@ -1 +1 @@
-<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-info"><i class="glyphicon glyphicon-book"/> <strong>Please select ...</strong></span><span class="text-info caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-info">Topics</span></li><li><a role="button" data-toggl [...]
\ No newline at end of file
+<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-info"><i class="glyphicon glyphicon-book"/> <strong>Please select ...</strong></span><span class="text-info caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-info">Topics</span></li><li><a role="button" data-toggl [...]
\ No newline at end of file
diff --git a/cassandra/src/test/resources/scalate/NoResultWithExecutionInfo.html b/cassandra/src/test/resources/scalate/NoResultWithExecutionInfo.html
index c8975c8..9ca4d45 100644
--- a/cassandra/src/test/resources/scalate/NoResultWithExecutionInfo.html
+++ b/cassandra/src/test/resources/scalate/NoResultWithExecutionInfo.html
@@ -1,42 +1 @@
-<div class="container">
- <div class="row text-center">
- <h4>No Result</h4>
- </div>
- <br/>
- <div class="row">
- <div class="col-md-3"></div>
- <div class="col-md-6 col-offset-md-3 table-responsive table-bordered">
- <table class="table">
- <caption><h5>Last query execution info</h5></caption>
- <thead>
- <tr>
- <th>Info</th>
- <th>Value</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>Statement</td>
- <td>CREATE TABLE IF NOT EXISTS no_select(id int PRIMARY KEY);</td>
- </tr>
- <tr>
- <td>Achieved Consistency</td>
- <td>N/A</td>
- </tr>
- <tr>
- <td>Tried Hosts</td>
- <td>TRIED_HOSTS</td>
- </tr>
- <tr>
- <td>Queried Hosts</td>
- <td>QUERIED_HOSTS</td>
- </tr>
- <tr>
- <td>Schema In Agreement</td>
- <td>true</td>
- </tr>
- </tbody>
- </table>
- </div>
- </div>
-</div>
\ No newline at end of file
+<div class="container"><div class="row text-center"><h4>No Result</h4></div><br/><div class="row"><div class="col-md-3"></div><div class="col-md-6 col-offset-md-3 table-responsive table-bordered"><table class="table"><caption><h5>Last query execution info</h5></caption><thead><tr><th>Info</th><th>Value</th></tr></thead><tbody><tr><td>Statement</td><td>CREATE TABLE IF NOT EXISTS no_select(id int PRIMARY KEY);</td></tr><tr><td>Tried Hosts</td><td>localhost:9142</td></tr><tr><td>Queried Hos [...]
\ No newline at end of file
diff --git a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/EnhancedSessionTest.scala b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/EnhancedSessionTest.scala
index c9ba19f..006fc14 100644
--- a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/EnhancedSessionTest.scala
+++ b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/EnhancedSessionTest.scala
@@ -17,7 +17,7 @@
package org.apache.zeppelin.cassandra
-import com.datastax.driver.core.{BatchStatement, SimpleStatement}
+import com.datastax.oss.driver.api.core.cql.{BatchStatement, BatchType, SimpleStatement}
import org.scalatest.FlatSpec
class EnhancedSessionTest extends FlatSpec {
@@ -48,24 +48,24 @@ class EnhancedSessionTest extends FlatSpec {
it should "be detected as DDL for create in simple statement" in {
assertResult(true) {
- EnhancedSession.isDDLStatement(new SimpleStatement("create TABLE if not exists test.test(id int primary key);"))
+ EnhancedSession.isDDLStatement(SimpleStatement.newInstance("create TABLE if not exists test.test(id int primary key);"))
}
}
it should "be detected as DDL for create in batch statement" in {
- val batch = new BatchStatement
- batch.add(new SimpleStatement("create TABLE if not exists test.test(id int primary key);"))
- batch.add(new SimpleStatement("insert into test.test(id) values(1);"))
+ val batch = BatchStatement.newInstance(BatchType.UNLOGGED)
+ .add(SimpleStatement.newInstance("create TABLE if not exists test.test(id int primary key);"))
+ .add(SimpleStatement.newInstance("insert into test.test(id) values(1);"))
assertResult(true) {
EnhancedSession.isDDLStatement(batch)
}
}
it should "not be detected as DDL for only inserts in batch statement" in {
- val batch = new BatchStatement
- batch.add(new SimpleStatement("insert into test.test(id) values(1);"))
- batch.add(new SimpleStatement("insert into test.test(id) values(2);"))
- batch.add(new SimpleStatement("insert into test.test(id) values(3);"))
+ val batch = BatchStatement.newInstance(BatchType.LOGGED)
+ .add(SimpleStatement.newInstance("insert into test.test(id) values(1);"))
+ .add(SimpleStatement.newInstance("insert into test.test(id) values(2);"))
+ .add(SimpleStatement.newInstance("insert into test.test(id) values(3);"))
assertResult(false) {
EnhancedSession.isDDLStatement(batch)
}
diff --git a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala
index 4c5c929..7998390 100644
--- a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala
+++ b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala
@@ -16,7 +16,8 @@
*/
package org.apache.zeppelin.cassandra
-import com.datastax.driver.core._
+import com.datastax.oss.driver.api.core.{ConsistencyLevel, CqlSession}
+import com.datastax.oss.driver.api.core.cql.{BatchStatement, BatchType, PreparedStatement}
import org.apache.zeppelin.interpreter.InterpreterException
import org.scalatest.mock.MockitoSugar
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
@@ -30,7 +31,7 @@ class ParagraphParserTest extends FlatSpec
with Matchers
with MockitoSugar {
- val session: Session = mock[Session]
+ val session: CqlSession = mock[CqlSession]
val preparedStatements:collection.mutable.Map[String,PreparedStatement] = collection.mutable.Map()
val parser: ParagraphParser = new ParagraphParser()
@@ -62,14 +63,14 @@ class ParagraphParserTest extends FlatSpec
parsed should matchPattern {
case parser.Success(List(
SimpleStm("SELECT * FROM albums LIMIT 10;"),
- BatchStm(BatchStatement.Type.UNLOGGED,
+ BatchStm(BatchType.UNLOGGED,
List(
SimpleStm("INSERT INTO users(id) VALUES(10);"),
BoundStm("test","'a',12.34")
)
),
SimpleStm("SELECT * FROM users LIMIT 10;"),
- BatchStm(BatchStatement.Type.LOGGED,
+ BatchStm(BatchType.LOGGED,
List(
SimpleStm("Insert INTO users(id) VALUES(11);"),
SimpleStm("INSERT INTO users(id) VALUES(12);")
@@ -162,20 +163,6 @@ class ParagraphParserTest extends FlatSpec
ex.getMessage should be(s"Invalid syntax for @timestamp. It should comply to the pattern ${TIMESTAMP_PATTERN.toString}")
}
- "Parser" should "parse retry policy" in {
- val query:String ="@retryPolicy="+CassandraInterpreter.DOWNGRADING_CONSISTENCY_RETRY
- val parsed = parser.parseAll(parser.retryPolicy, query)
- parsed should matchPattern {case parser.Success(DowngradingRetryPolicy, _) => }
- }
-
- "Parser" should "fails parsing invalid retry policy" in {
- val query:String =""" @retryPolicy=TEST""".stripMargin
- val ex = intercept[InterpreterException] {
- parser.parseAll(parser.retryPolicy, query)
- }
- ex.getMessage should be(s"Invalid syntax for @retryPolicy. It should comply to the pattern ${RETRY_POLICIES_PATTERN.toString}")
- }
-
"Parser" should "parse fetch size" in {
val query:String ="@fetchSize=100"
val parsed = parser.parseAll(parser.fetchSize, query)
@@ -201,7 +188,7 @@ class ParagraphParserTest extends FlatSpec
val query:String =""" sElecT * FROM users LIMIT ? ;""".stripMargin
//When
- val parsed = parser.parseAll(parser.genericStatement, query)
+ val parsed = parser.parseAll(parser.genericStatement(), query)
//Then
parsed should matchPattern { case parser.Success(SimpleStm("sElecT * FROM users LIMIT ? ;"), _) =>}
@@ -212,7 +199,7 @@ class ParagraphParserTest extends FlatSpec
val query:String =""" @prepare[select_users]=SELECT * FROM users LIMIT ? """.stripMargin
//When
- val parsed = parser.parseAll(parser.prepare, query)
+ val parsed = parser.parseAll(parser.prepare(), query)
//Then
parsed should matchPattern { case parser.Success(PrepareStm("select_users","SELECT * FROM users LIMIT ?"), _) => }
@@ -221,7 +208,7 @@ class ParagraphParserTest extends FlatSpec
"Parser" should "fails parsing invalid prepared statement" in {
val query:String =""" @prepare=SELECT * FROM users LIMIT ?""".stripMargin
val ex = intercept[InterpreterException] {
- parser.parseAll(parser.prepare, query)
+ parser.parseAll(parser.prepare(), query)
}
ex.getMessage should be(s"Invalid syntax for @prepare. It should comply to the pattern: @prepare[prepared_statement_name]=CQL Statement (without semi-colon)")
}
@@ -231,7 +218,7 @@ class ParagraphParserTest extends FlatSpec
val query:String =""" @remove_prepare[select_users ]""".stripMargin
//When
- val parsed = parser.parseAll(parser.removePrepare, query)
+ val parsed = parser.parseAll(parser.removePrepare(), query)
//Then
parsed should matchPattern { case parser.Success(RemovePrepareStm("select_users"), _) => }
@@ -240,7 +227,7 @@ class ParagraphParserTest extends FlatSpec
"Parser" should "fails parsing invalid remove prepared statement" in {
val query:String =""" @remove_prepare[select_users]=SELECT * FROM users LIMIT ?""".stripMargin
val ex = intercept[InterpreterException] {
- parser.parseAll(parser.removePrepare, query)
+ parser.parseAll(parser.removePrepare(), query)
}
ex.getMessage should be(s"Invalid syntax for @remove_prepare. It should comply to the pattern: @remove_prepare[prepared_statement_name]")
}
@@ -250,7 +237,7 @@ class ParagraphParserTest extends FlatSpec
val query:String =""" @bind[select_users ]=10,'toto'""".stripMargin
//When
- val parsed = parser.parseAll(parser.bind, query)
+ val parsed = parser.parseAll(parser.bind(), query)
//Then
parsed should matchPattern { case parser.Success(BoundStm("select_users","10,'toto'"), _) => }
@@ -259,7 +246,7 @@ class ParagraphParserTest extends FlatSpec
"Parser" should "fails parsing invalid bind statement" in {
val query:String =""" @bind[select_users]=""".stripMargin
val ex = intercept[InterpreterException] {
- parser.parseAll(parser.bind, query)
+ parser.parseAll(parser.bind(), query)
}
ex.getMessage should be("""Invalid syntax for @bind. It should comply to the pattern: @bind[prepared_statement_name]=10,'jdoe','John DOE',12345,'2015-07-32 12:04:23.234' OR @bind[prepared_statement_name] with no bound value. No semi-colon""")
}
@@ -280,7 +267,7 @@ class ParagraphParserTest extends FlatSpec
//Then
parsed should matchPattern {
case parser.Success(BatchStm(
- BatchStatement.Type.LOGGED,
+ BatchType.LOGGED,
List(
SimpleStm("Insert INTO users(id) VALUES(10);"),
BoundStm("select_users", "10,'toto'"),
@@ -346,7 +333,6 @@ class ParagraphParserTest extends FlatSpec
" INSERT INTO zeppelin.albums(title,artist,year) VALUES('Primitive','Soulfly',2003);\n"+
"APPLY BATCH;\n"+
"@timestamp=10\n" +
- "@retryPolicy=DOWNGRADING_CONSISTENCY\n" +
"SELECT * FROM zeppelin.albums;"
val parsed = parser.parseAll(parser.queries, query)
@@ -356,7 +342,7 @@ class ParagraphParserTest extends FlatSpec
SimpleStm("CREATE TABLE IF NOT EXISTS zeppelin.albums(\n title text PRIMARY KEY,\n artist text,\n year int\n);"),
Consistency(ConsistencyLevel.THREE),
SerialConsistency(ConsistencyLevel.SERIAL),
- BatchStm(BatchStatement.Type.LOGGED,
+ BatchStm(BatchType.LOGGED,
List(
SimpleStm("INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Impossible Dream EP','Carter the Unstoppable Sex Machine',1992);"),
SimpleStm("INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Way You Are','Tears for Fears',1983);"),
@@ -364,7 +350,6 @@ class ParagraphParserTest extends FlatSpec
)
),
Timestamp(10L),
- DowngradingRetryPolicy,
SimpleStm("SELECT * FROM zeppelin.albums;")
), _) =>
}
@@ -389,7 +374,7 @@ class ParagraphParserTest extends FlatSpec
parsed should matchPattern {
case parser.Success(List(
SimpleStm("CREATE TABLE IF NOT EXISTS zeppelin.albums(\n title text PRIMARY KEY,\n artist text,\n year int\n);"),
- BatchStm(BatchStatement.Type.LOGGED,
+ BatchStm(BatchType.LOGGED,
List(
SimpleStm("INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Impossible Dream EP','Carter the Unstoppable Sex Machine',1992);"),
SimpleStm("INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Way You Are','Tears for Fears',1983);"),
diff --git a/docs/interpreter/cassandra.md b/docs/interpreter/cassandra.md
index 3e53725..1837c53 100644
--- a/docs/interpreter/cassandra.md
+++ b/docs/interpreter/cassandra.md
@@ -79,7 +79,7 @@ The **Cassandra** interpreter accepts the following commands
</tr>
<tr>
<td nowrap>Option commands</td>
- <td>`@consistency`, `@retryPolicy`, `@fetchSize` ...</td>
+ <td>`@consistency`, `@fetchSize` ...</td>
<td>Inject runtime options to all statements in the paragraph</td>
</tr>
<tr>
@@ -113,9 +113,8 @@ Each statement should be separated by a semi-colon ( **;** ) except the special
4. `@consistency`
5. `@serialConsistency`
6. `@timestamp`
-7. `@retryPolicy`
-8. `@fetchSize`
-9. `@requestTimeOut`
+7. `@fetchSize`
+8. `@requestTimeOut`
Multi-line statements as well as multiple statements on the same line are also supported as long as they are separated by a semi-colon. Ex:
@@ -165,8 +164,8 @@ The complete list of all CQL statements and versions can be found below:
<td><strong>3.x</strong></td>
<td>
<a target="_blank"
- href="http://docs.datastax.com/en/cql/3.3/cql/cqlIntro.html">
- http://docs.datastax.com/en/cql/3.3/cql/cqlIntro.html
+ href="https://docs.datastax.com/en/archived/cql/3.3/cql/cqlIntro.html">
+ https://docs.datastax.com/en/archived/cql/3.3/cql/cqlIntro.html
</a>
</td>
</tr>
@@ -174,8 +173,8 @@ The complete list of all CQL statements and versions can be found below:
<td><strong>2.2</strong></td>
<td>
<a target="_blank"
- href="http://docs.datastax.com/en/cql/3.3/cql/cqlIntro.html">
- http://docs.datastax.com/en/cql/3.3/cql/cqlIntro.html
+ href="https://docs.datastax.com/en/archived/cql/3.3/cql/cqlIntro.html">
+ https://docs.datastax.com/en/archived/cql/3.3/cql/cqlIntro.html
</a>
</td>
</tr>
@@ -192,8 +191,8 @@ The complete list of all CQL statements and versions can be found below:
<td><strong>1.2</strong></td>
<td>
<a target="_blank"
- href="http://docs.datastax.com/en/cql/3.0/cql/aboutCQL.html">
- http://docs.datastax.com/en/cql/3.0/cql/aboutCQL.html
+ href="https://docs.datastax.com/en/archived/cql/3.1/cql/cql_intro_c.html">
+ https://docs.datastax.com/en/archived/cql/3.1/cql/cql_intro_c.html
</a>
</td>
</tr>
@@ -351,11 +350,6 @@ Below is the list of all parameters:
</td>
</tr>
<tr>
- <td nowrap>Retry Policy</td>
- <td><strong>@retryPolicy=<em>value</em></strong></td>
- <td>Apply the given retry policy to all queries in the paragraph</td>
- </tr>
- <tr>
<td nowrap>Fetch Size</td>
<td><strong>@fetchSize=<em>integer value</em></strong></td>
<td>Apply the given fetch size to all queries in the paragraph</td>
@@ -389,10 +383,6 @@ Some parameters only accept restricted values:
<td>Any long value</td>
</tr>
<tr>
- <td nowrap>Retry Policy</td>
- <td><strong>DEFAULT, DOWNGRADING_CONSISTENCY, FALLTHROUGH, LOGGING_DEFAULT, LOGGING_DOWNGRADING, LOGGING_FALLTHROUGH</strong></td>
- </tr>
- <tr>
<td nowrap>Fetch Size</td>
<td>Any integer value</td>
</tr>
@@ -494,6 +484,7 @@ Bound values are not mandatory for the **@bind** statement. However if you provi
* Date values should be enclosed between simple quotes (**'**) and respect the formats (full list is in the [documentation](https://docs.datastax.com/en/cql/3.3/cql/cql_reference/timestamp_type_r.html)):
1. yyyy-MM-dd HH:MM:ss
2. yyyy-MM-dd HH:MM:ss.SSS
+ 2. yyyy-mm-dd'T'HH:mm:ss.SSSZ
* **null** is parsed as-is
* **boolean** (`true`|`false`) are parsed as-is
* collection values must follow the **[standard CQL syntax]**:
@@ -584,7 +575,7 @@ The **isolated** mode is the most extreme and will create as many JVM/`com.datas
## Interpreter Configuration
To configure the **Cassandra** interpreter, go to the **Interpreter** menu and scroll down to change the parameters.
-The **Cassandra** interpreter is using the official **[Cassandra Java Driver]** and most of the parameters are used
+The **Cassandra** interpreter is using the official **[Datastax Java Driver for Apache Cassandra]®** and most of the parameters are used
to configure the Java driver
Below are the configuration parameters and their default values.
@@ -644,10 +635,9 @@ Below are the configuration parameters and their default values.
<tr>
<td>`cassandra.load.balancing.policy`</td>
<td>
- Load balancing policy. Default = `new TokenAwarePolicy(new DCAwareRoundRobinPolicy())`
+ Load balancing policy. Default = `DefaultLoadBalancingPolicy`
To Specify your own policy, provide the <em>fully qualify class name (FQCN)</em> of your policy.
- At runtime the interpreter will instantiate the policy using
- <strong>Class.forName(FQCN)</strong>
+ At runtime the driver will instantiate the policy using class name.
</td>
<td>DEFAULT</td>
</tr>
@@ -657,12 +647,12 @@ Below are the configuration parameters and their default values.
<td>10</td>
</tr>
<tr>
- <td>`cassandra.pooling.core.connection.per.host.local`</td>
+ <td>`cassandra.pooling.connection.per.host.local`</td>
<td>Protocol V2 and below default = 2. Protocol V3 and above default = 1</td>
<td>2</td>
</tr>
<tr>
- <td>`cassandra.pooling.core.connection.per.host.remote`</td>
+ <td>`cassandra.pooling.connection.per.host.remote`</td>
<td>Protocol V2 and below default = 1. Protocol V3 and above default = 1</td>
<td>1</td>
</tr>
@@ -672,49 +662,19 @@ Below are the configuration parameters and their default values.
<td>30</td>
</tr>
<tr>
- <td>`cassandra.pooling.idle.timeout.seconds`</td>
- <td>Cassandra idle time out in seconds</td>
- <td>120</td>
- </tr>
- <tr>
- <td>`cassandra.pooling.max.connection.per.host.local`</td>
- <td>Protocol V2 and below default = 8. Protocol V3 and above default = 1</td>
- <td>8</td>
- </tr>
- <tr>
- <td>`cassandra.pooling.max.connection.per.host.remote`</td>
- <td>Protocol V2 and below default = 2. Protocol V3 and above default = 1</td>
- <td>2</td>
- </tr>
- <tr>
- <td>`cassandra.pooling.max.request.per.connection.local`</td>
+ <td>`cassandra.pooling.max.request.per.connection`</td>
<td>Protocol V2 and below default = 128. Protocol V3 and above default = 1024</td>
<td>128</td>
</tr>
<tr>
- <td>`cassandra.pooling.max.request.per.connection.remote`</td>
- <td>Protocol V2 and below default = 128. Protocol V3 and above default = 256</td>
- <td>128</td>
- </tr>
- <tr>
- <td>`cassandra.pooling.new.connection.threshold.local`</td>
- <td>Protocol V2 and below default = 100. Protocol V3 and above default = 800</td>
- <td>100</td>
- </tr>
- <tr>
- <td>`cassandra.pooling.new.connection.threshold.remote`</td>
- <td>Protocol V2 and below default = 100. Protocol V3 and above default = 200</td>
- <td>100</td>
- </tr>
- <tr>
<td>`cassandra.pooling.pool.timeout.millisecs`</td>
<td>Cassandra pool time out in millisecs</td>
<td>5000</td>
</tr>
<tr>
<td>`cassandra.protocol.version`</td>
- <td>Cassandra binary protocol version</td>
- <td>4</td>
+ <td>Cassandra binary protocol version (`3`, `4`, `DSE1`, `DSE2`)</td>
+ <td>`DEFAULT` (detected automatically)</td>
</tr>
<tr>
<td>cassandra.query.default.consistency</td>
@@ -743,10 +703,9 @@ Below are the configuration parameters and their default values.
<td>`cassandra.reconnection.policy`</td>
<td>
Cassandra Reconnection Policy.
- Default = `new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000)`
+ Default = `ExponentialReconnectionPolicy`
To Specify your own policy, provide the <em>fully qualify class name (FQCN)</em> of your policy.
- At runtime the interpreter will instantiate the policy using
- <strong>Class.forName(FQCN)</strong>
+ At runtime the driver will instantiate the policy using class name.
</td>
<td>DEFAULT</td>
</tr>
@@ -754,10 +713,9 @@ Below are the configuration parameters and their default values.
<td>`cassandra.retry.policy`</td>
<td>
Cassandra Retry Policy.
- Default = `DefaultRetryPolicy.INSTANCE`
+ Default = `DefaultRetryPolicy`
To Specify your own policy, provide the <em>fully qualify class name (FQCN)</em> of your policy.
- At runtime the interpreter will instantiate the policy using
- <strong>Class.forName(FQCN)</strong>
+ At runtime the driver will instantiate the policy using class name.
</td>
<td>DEFAULT</td>
</tr>
@@ -780,10 +738,9 @@ Below are the configuration parameters and their default values.
<td>`cassandra.speculative.execution.policy`</td>
<td>
Cassandra Speculative Execution Policy.
- Default = `NoSpeculativeExecutionPolicy.INSTANCE`
+ Default = `NoSpeculativeExecutionPolicy`
To Specify your own policy, provide the <em>fully qualify class name (FQCN)</em> of your policy.
- At runtime the interpreter will instantiate the policy using
- <strong>Class.forName(FQCN)</strong>
+ At runtime the driver will instantiate the policy using class name.
</td>
<td>DEFAULT</td>
</tr>
@@ -814,6 +771,15 @@ Below are the configuration parameters and their default values.
## Change Log
+**3.2** _(Zeppelin {{ site.ZEPPELIN_VERSION }})_ :
+
+* Refactor to use unified Java driver 4.5
+ ([ZEPPELIN-4378](https://issues.apache.org/jira/browse/ZEPPELIN-4378):
+ * changes in configuration were necessary, as new driver has different architecture, and
+ configuration options;
+ * interpreter got support for DSE-specific data types, and other extensions;
+ * support for `@retryPolicy` is removed, as only single retry policy is shipped with driver.
+
**3.1** _(Zeppelin {{ site.ZEPPELIN_VERSION }})_ :
* Upgrade Java driver to 3.7.2 ([ZEPPELIN-4331](https://issues.apache.org/jira/browse/ZEPPELIN-4331);
@@ -844,14 +810,12 @@ Below are the configuration parameters and their default values.
## Bugs & Contacts
- If you encounter a bug for this interpreter, please create a **[JIRA]** ticket and ping me on Twitter
- at **[@doanduyhai]**
+ If you encounter a bug for this interpreter, please create a **[JIRA]** ticket.
-[Cassandra Java Driver]: https://github.com/datastax/java-driver
+[Datastax Java Driver for Apache Cassandra]: https://docs.datastax.com/en/developer/java-driver/latest/
[standard CQL syntax]: http://docs.datastax.com/en/cql/3.1/cql/cql_using/use_collections_c.html
[Tuple CQL syntax]: http://docs.datastax.com/en/cql/3.1/cql/cql_reference/tupleType.html
[UDT CQL syntax]: http://docs.datastax.com/en/cql/3.1/cql/cql_using/cqlUseUDT.html
[Zeppelin Dynamic Form](../usage/dynamic_form/intro.html)
[Interpreter Binding Mode](../usage/interpreter/interpreter_binding_mode.html)
-[JIRA]: https://issues.apache.org/jira/browse/ZEPPELIN-382?jql=project%20%3D%20ZEPPELIN
-[@doanduyhai]: https://twitter.com/doanduyhai
+[JIRA]: https://issues.apache.org/jira/browse/ZEPPELIN
diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE
index a861833..386398e 100644
--- a/zeppelin-distribution/src/bin_license/LICENSE
+++ b/zeppelin-distribution/src/bin_license/LICENSE
@@ -68,9 +68,9 @@ The following components are provided under Apache License.
(Apache 2.0) xml apis (xml-apis:xml-apis:jar:1.4.01 - http://xerces.apache.org/xml-commons/components/external)
(Apache 2.0) java-xmlbuilder (com.jamesmurty.utils:java-xmlbuilder:jar:1.0 - https://github.com/jmurty/java-xmlbuilder)
(Apache 2.0) compress-lzf (com.ning:compress-lzf:jar:1.0.3 - https://github.com/ning/compress) Copyright 2009-2010 Ning, Inc.
- (Apache 2.0) java-driver-core (com.datastax.oss:java-driver-core:jar:3.7.2 - https://github.com/datastax/java-driver)
- (Apache 2.0) Snappy-java (org.xerial.snappy:snappy-java:1.1.2.4 - https://github.com/xerial/snappy-java/)
- (Apache 2.0) lz4-java (org.lz4:lz4-java:jar:1.4.1 - https://github.com/lz4/lz4-java)
+ (Apache 2.0) java-driver-core (com.datastax.oss:java-driver-core:jar:4.5.1 - https://github.com/datastax/java-driver)
+ (Apache 2.0) Snappy-java (org.xerial.snappy:snappy-java:1.1.7.3 - https://github.com/xerial/snappy-java/)
+ (Apache 2.0) lz4-java (org.lz4:lz4-java:jar:1.6.0 - https://github.com/lz4/lz4-java)
(Apache 2.0) RoaringBitmap (org.roaringbitmap:RoaringBitmap:jar:0.5.11 - https://github.com/lemire/RoaringBitmap)
(Apache 2.0) json4s (org.json4s:json4s-ast_2.10:jar:3.2.10 - https://github.com/json4s/json4s)
(Apache 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc)