You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by al...@apache.org on 2020/05/01 11:28:34 UTC

[zeppelin] branch master updated: [ZEPPELIN-4378] porting Cassandra interpreter to new driver

This is an automated email from the ASF dual-hosted git repository.

alexott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f6a5d5  [ZEPPELIN-4378] porting Cassandra interpreter to new driver
5f6a5d5 is described below

commit 5f6a5d58fd87812281349ea01cb8b79c75c3ef40
Author: Alex Ott <al...@gmail.com>
AuthorDate: Wed Mar 25 18:12:23 2020 +0100

    [ZEPPELIN-4378] porting Cassandra interpreter to new driver
    
    ### What is this PR for?
    
    This PR refactors Cassandra interpreter to use new DataStax Java driver (4.x) that has new features & new architecture, that caused significant changes in the code base. But as result, we're getting:
    * We get supported version of the driver (3.x line will only receive critical bug fixes)
    * We get support for additional functionality of the DSE (DataStax Enterprise) - security, additional data types, etc.
    * Improvements in the load balancing, etc.
    
    ### What type of PR is it?
    
    Refactoring
    
    ### What is the Jira issue?
    
    * ZEPPELIN-4378
    
    ### How should this be tested?
    
    * Travis CI build: https://travis-ci.org/github/alexott/zeppelin/builds/681878444
    * Was tested manually in addition to the unit tests
    
    ### Questions:
    * Does the licenses files need update? Done, minor updates
    * Is there breaking changes for older versions? There are renames & deletion of the configuration properties, but this doesn't break existing setups. The only breaking change is removal of "standard" retry policies from syntax - but it was necessary to do, as they not shipped with driver anymore.
    * Does this needs documentation? Documentation was updated
    
    Author: Alex Ott <al...@gmail.com>
    
    Closes #3699 from alexott/ZEPPELIN-4378 and squashes the following commits:
    
    1c9145379 [Alex Ott] [ZEPPELIN-4378] porting Cassandra interpreter to new driver
---
 cassandra/pom.xml                                  |  62 +-
 .../zeppelin/cassandra/CassandraInterpreter.java   | 174 +++--
 .../src/main/resources/interpreter-setting.json    |  68 +-
 .../src/main/resources/scalate/allAggregates.ssp   |   2 +-
 .../src/main/resources/scalate/allFunctions.ssp    |   2 +-
 .../resources/scalate/allMaterializedViews.ssp     |   2 +-
 cassandra/src/main/resources/scalate/allTables.ssp |   2 +-
 cassandra/src/main/resources/scalate/allUDTs.ssp   |   2 +-
 cassandra/src/main/resources/scalate/helpMenu.ssp  |  36 +-
 .../resources/scalate/materializedViewDetails.ssp  |  10 +-
 .../scalate/noResultWithExecutionInfo.ssp          |   7 +-
 .../src/main/resources/scalate/tableDetails.ssp    |  10 +-
 .../src/main/resources/scalate/udtDetails.ssp      |   2 +-
 .../driver/core/TableMetadataWrapper.scala         |   4 +-
 .../zeppelin/cassandra/BoundValuesParser.scala     |   9 +-
 .../apache/zeppelin/cassandra/DisplaySystem.scala  | 491 +++++++-------
 .../zeppelin/cassandra/EnhancedSession.scala       | 215 +++---
 .../zeppelin/cassandra/InterpreterLogic.scala      | 279 ++++----
 .../zeppelin/cassandra/JavaDriverConfig.scala      | 397 +++--------
 .../zeppelin/cassandra/MetaDataHierarchy.scala     |  52 +-
 .../zeppelin/cassandra/ParagraphParser.scala       | 247 +++----
 .../zeppelin/cassandra/TextBlockHierarchy.scala    |  14 +-
 .../scala/compat/java8/OptionConverters.scala      | 140 ++++
 .../cassandra/CassandraInterpreterTest.java        | 255 ++++---
 .../zeppelin/cassandra/InterpreterLogicTest.java   | 103 ++-
 cassandra/src/test/resources/application.conf      |   3 +
 .../{prepare_schema.cql => prepare_all.cql}        |  21 +-
 cassandra/src/test/resources/prepare_data.cql      |  18 -
 .../scalate/DescribeKeyspace_live_data.html        |   2 +-
 .../test/resources/scalate/DescribeKeyspaces.html  |   2 +-
 .../DescribeTable_live_data_complex_table.html     |   2 +-
 .../src/test/resources/scalate/DescribeTables.html | 750 +++++++++++++--------
 .../src/test/resources/scalate/DescribeTypes.html  | 179 +++++
 cassandra/src/test/resources/scalate/Help.html     |   2 +-
 .../scalate/NoResultWithExecutionInfo.html         |  43 +-
 .../zeppelin/cassandra/EnhancedSessionTest.scala   |  18 +-
 .../zeppelin/cassandra/ParagraphParserTest.scala   |  45 +-
 docs/interpreter/cassandra.md                      | 108 +--
 zeppelin-distribution/src/bin_license/LICENSE      |   6 +-
 39 files changed, 1962 insertions(+), 1822 deletions(-)

diff --git a/cassandra/pom.xml b/cassandra/pom.xml
index 6424f6c..6d37359 100644
--- a/cassandra/pom.xml
+++ b/cassandra/pom.xml
@@ -33,15 +33,15 @@
     <description>Zeppelin cassandra support</description>
 
     <properties>
-        <cassandra.driver.version>3.7.2</cassandra.driver.version>
-        <snappy.version>1.1.2.6</snappy.version>
-        <lz4.version>1.4.1</lz4.version>
+        <cassandra.driver.version>4.6.0</cassandra.driver.version>
+        <snappy.version>1.1.7.3</snappy.version>
+        <lz4.version>1.6.0</lz4.version>
+        <commons-lang.version>3.3.2</commons-lang.version>
         <scalate.version>1.7.1</scalate.version>
-        <cassandra.guava.version>19.0</cassandra.guava.version>
 
         <!-- test library versions -->
-        <achilles.version>3.2.4-Zeppelin</achilles.version>
         <jna.version>4.2.0</jna.version>
+        <cassandra.unit.version>4.3.1.0</cassandra.unit.version>
 
         <interpreter.name>cassandra</interpreter.name>
     </properties>
@@ -52,7 +52,7 @@
             <groupId>org.apache.zeppelin</groupId>
             <artifactId>zeppelin-interpreter</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <groupId>org.hamcrest</groupId>
@@ -62,17 +62,11 @@
         </dependency>
 
         <dependency>
-            <groupId>com.datastax.cassandra</groupId>
-            <artifactId>cassandra-driver-core</artifactId>
+            <groupId>com.datastax.oss</groupId>
+            <artifactId>java-driver-core</artifactId>
             <version>${cassandra.driver.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${cassandra.guava.version}</version>
-        </dependency>
-
         <!-- Compression libraries for the cassandra-driver protocol. -->
         <!-- Include both compression options to make to simplify deployment. -->
 
@@ -109,6 +103,13 @@
             <scope>runtime</scope>
         </dependency>
 
+        <!-- we'll switch back to it after removing support for Scala 2.10
+        <dependency>
+            <groupId>org.scala-lang.modules</groupId>
+            <artifactId>scala-java8-compat_${scala.binary.version}</artifactId>
+            <version>${scala.java8.compat.version}</version>
+        </dependency> -->
+
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
@@ -156,30 +157,19 @@
         </dependency>
 
         <dependency>
-            <groupId>info.archinnov</groupId>
-            <artifactId>achilles-embedded</artifactId>
-            <version>${achilles.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>ch.qos.logback</groupId>
-                    <artifactId>logback-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>ch.qos.logback</groupId>
-                    <artifactId>logback-classic</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>net.java.dev.jna</groupId>
-                    <artifactId>jna</artifactId>
-                </exclusion>
-                <exclusion>
-                  <groupId>org.slf4j</groupId>
-                  <artifactId>log4j-over-slf4j</artifactId>
-                </exclusion>
-            </exclusions>
+          <groupId>org.cassandraunit</groupId>
+          <artifactId>cassandra-unit</artifactId>
+          <version>${cassandra.unit.version}</version>
+          <scope>test</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>com.datastax.oss</groupId>
+              <artifactId>java-driver-core</artifactId>
+            </exclusion>
+          </exclusions>
         </dependency>
 
+        
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
index 105e271..682bf2d 100644
--- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
+++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
@@ -16,33 +16,36 @@
  */
 package org.apache.zeppelin.cassandra;
 
-import static java.lang.Integer.parseInt;
-
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
+import com.datastax.oss.driver.internal.core.loadbalancing.DcInferringLoadBalancingPolicy;
+import com.datastax.oss.driver.shaded.guava.common.net.InetAddresses;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
 import java.io.InputStream;
+import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.security.KeyStore;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.JdkSSLOptions;
-import com.datastax.driver.core.ProtocolOptions.Compression;
-import com.datastax.driver.core.Session;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
+import static java.lang.Integer.parseInt;
 
 /**
  * Interpreter for Apache Cassandra CQL query language.
@@ -68,24 +71,12 @@ public class CassandraInterpreter extends Interpreter {
           "cassandra.speculative.execution.policy";
   public static final String CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS =
           "cassandra.max.schema.agreement.wait.second";
-  public static final String CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL =
-          "cassandra.pooling.new.connection.threshold.local";
-  public static final String CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE =
-          "cassandra.pooling.new.connection.threshold.remote";
-  public static final String CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL =
-          "cassandra.pooling.max.connection.per.host.local";
-  public static final String CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE =
-          "cassandra.pooling.max.connection.per.host.remote";
-  public static final String CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL =
-          "cassandra.pooling.core.connection.per.host.local";
-  public static final String CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE =
-          "cassandra.pooling.core.connection.per.host.remote";
-  public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL =
-          "cassandra.pooling.max.request.per.connection.local";
-  public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE =
-          "cassandra.pooling.max.request.per.connection.remote";
-  public static final String CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS =
-          "cassandra.pooling.idle.timeout.seconds";
+  public static final String CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL =
+          "cassandra.pooling.connection.per.host.local";
+  public static final String CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE =
+          "cassandra.pooling.connection.per.host.remote";
+  public static final String CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION =
+          "cassandra.pooling.max.request.per.connection";
   public static final String CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS =
           "cassandra.pooling.pool.timeout.millisecs";
   public static final String CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS =
@@ -122,24 +113,15 @@ public class CassandraInterpreter extends Interpreter {
           "cassandra.ssl.truststore.password";
 
 
-  public static final String DEFAULT_HOST = "localhost";
+  public static final String DEFAULT_HOST = "127.0.0.1";
   public static final String DEFAULT_PORT = "9042";
-  public static final String DEFAULT_CLUSTER = "Test Cluster";
   public static final String DEFAULT_KEYSPACE = "system";
-  public static final String DEFAULT_PROTOCOL_VERSION = "4";
-  public static final String DEFAULT_COMPRESSION = "NONE";
-  public static final String DEFAULT_CREDENTIAL = "none";
+  public static final String DEFAULT_PROTOCOL_VERSION = "DEFAULT";
+  public static final String DEFAULT_COMPRESSION = "none";
+  public static final String DEFAULT_CONNECTIONS_PER_HOST = "1";
+  public static final String DEFAULT_MAX_REQUEST_PER_CONNECTION = "1024";
   public static final String DEFAULT_POLICY = "DEFAULT";
   public static final String DEFAULT_PARALLELISM = "10";
-  static String defaultNewConnectionThresholdLocal = "100";
-  static String defaultNewConnectionThresholdRemote = "100";
-  static String defaultCoreConnectionPerHostLocal = "2";
-  static String defaultCoreConnectionPerHostRemote = "1";
-  static String defaultMaxConnectionPerHostLocal = "8";
-  static String defaultMaxConnectionPerHostRemote = "2";
-  static String defaultMaxRequestPerConnectionLocal = "1024";
-  static String defaultMaxRequestPerConnectionRemote = "256";
-  public static final String DEFAULT_IDLE_TIMEOUT = "120";
   public static final String DEFAULT_POOL_TIMEOUT = "5000";
   public static final String DEFAULT_HEARTBEAT_INTERVAL = "30";
   public static final String DEFAULT_CONSISTENCY = "ONE";
@@ -148,19 +130,12 @@ public class CassandraInterpreter extends Interpreter {
   public static final String DEFAULT_CONNECTION_TIMEOUT = "5000";
   public static final String DEFAULT_READ_TIMEOUT = "12000";
   public static final String DEFAULT_TCP_NO_DELAY = "true";
+  public static final String DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS = "12";
 
-  public static final String DOWNGRADING_CONSISTENCY_RETRY = "DOWNGRADING_CONSISTENCY";
-  public static final String FALLTHROUGH_RETRY = "FALLTHROUGH";
-  public static final String LOGGING_DEFAULT_RETRY = "LOGGING_DEFAULT";
-  public static final String LOGGING_DOWNGRADING_RETRY = "LOGGING_DOWNGRADING";
-  public static final String LOGGING_FALLTHROUGH_RETRY = "LOGGING_FALLTHROUGH";
-
-  public static final List NO_COMPLETION = new ArrayList<>();
+  static final List NO_COMPLETION = new ArrayList<>();
 
   InterpreterLogic helper;
-  Cluster.Builder clusterBuilder;
-  Cluster cluster;
-  Session session;
+  CqlSession session;
   private JavaDriverConfig driverConfig = new JavaDriverConfig();
 
   public CassandraInterpreter(Properties properties) {
@@ -170,35 +145,58 @@ public class CassandraInterpreter extends Interpreter {
   @Override
   public void open() {
 
-    final String[] addresses = getProperty(CASSANDRA_HOSTS).split(",");
-    final int port = parseInt(getProperty(CASSANDRA_PORT));
-    StringBuilder hosts = new StringBuilder();
+    final String[] addresses = getProperty(CASSANDRA_HOSTS, DEFAULT_HOST).split(",");
+    final int port = parseInt(getProperty(CASSANDRA_PORT, DEFAULT_PORT));
+    Collection<InetSocketAddress> hosts = new ArrayList<>();
     for (String address : addresses) {
-      hosts.append(address).append(",");
+      if (InetAddresses.isInetAddress(address)) {
+        hosts.add(new InetSocketAddress(address, port));
+      } else {
+        // TODO(alex): maybe it won't be necessary in 4.4
+        hosts.add(InetSocketAddress.createUnresolved(address, port));
+      }
     }
 
-    LOGGER.info("Bootstrapping Cassandra Java Driver to connect to " + hosts.toString() +
-            "on port " + port);
-
-    Compression compression = driverConfig.getCompressionProtocol(this);
-
-    clusterBuilder = Cluster.builder()
-            .addContactPoints(addresses)
-            .withPort(port)
-            .withProtocolVersion(driverConfig.getProtocolVersion(this))
-            .withClusterName(getProperty(CASSANDRA_CLUSTER_NAME))
-            .withCompression(compression)
-            .withCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME),
+    LOGGER.info("Bootstrapping Cassandra Java Driver to connect to " +
+            getProperty(CASSANDRA_HOSTS) + "on port " + port);
+
+    // start generation of the config
+    ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader.programmaticBuilder();
+
+    driverConfig.setCompressionProtocol(this, configBuilder);
+    driverConfig.setPoolingOptions(this, configBuilder);
+    driverConfig.setProtocolVersion(this, configBuilder);
+    driverConfig.setQueryOptions(this, configBuilder);
+    driverConfig.setReconnectionPolicy(this, configBuilder);
+    driverConfig.setRetryPolicy(this, configBuilder);
+    driverConfig.setSocketOptions(this, configBuilder);
+    driverConfig.setSpeculativeExecutionPolicy(this, configBuilder);
+
+    //
+    configBuilder.withClass(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS,
+            DcInferringLoadBalancingPolicy.class);
+    configBuilder.withBoolean(DefaultDriverOption.RESOLVE_CONTACT_POINTS, false);
+
+    configBuilder.withInt(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT,
+            parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
+                    DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)));
+
+    DriverConfigLoader loader = configBuilder.endProfile().build();
+    // TODO(alex): think how to dump built configuration...
+    logger.debug(loader.toString());
+    // end generation of config
+
+    CqlSessionBuilder clusterBuilder = CqlSession.builder()
+            .addContactPoints(hosts)
+            .withAuthCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME),
                     getProperty(CASSANDRA_CREDENTIALS_PASSWORD))
-            .withLoadBalancingPolicy(driverConfig.getLoadBalancingPolicy(this))
-            .withRetryPolicy(driverConfig.getRetryPolicy(this))
-            .withReconnectionPolicy(driverConfig.getReconnectionPolicy(this))
-            .withSpeculativeExecutionPolicy(driverConfig.getSpeculativeExecutionPolicy(this))
-            .withMaxSchemaAgreementWaitSeconds(
-                    parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)))
-            .withPoolingOptions(driverConfig.getPoolingOptions(this))
-            .withQueryOptions(driverConfig.getQueryOptions(this))
-            .withSocketOptions(driverConfig.getSocketOptions(this));
+            .withApplicationName("")
+            .withApplicationVersion("");
+
+    String keyspace = getProperty(CASSANDRA_KEYSPACE_NAME, DEFAULT_KEYSPACE);
+    if (StringUtils.isNotBlank(keyspace) && !DEFAULT_KEYSPACE.equalsIgnoreCase(keyspace)) {
+      clusterBuilder.withKeyspace(keyspace);
+    }
 
     final String runWithSSL = getProperty(CASSANDRA_WITH_SSL);
     if (runWithSSL != null && runWithSSL.equals("true")) {
@@ -219,9 +217,7 @@ public class CassandraInterpreter extends Interpreter {
           sslContext = SSLContext.getInstance("TLS");
           sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
         }
-        clusterBuilder = clusterBuilder.withSSL(JdkSSLOptions.builder()
-                .withSSLContext(sslContext)
-                .build());
+        clusterBuilder = clusterBuilder.withSslContext(sslContext);
       } catch (Exception e) {
         LOGGER.error(e.toString());
       }
@@ -229,15 +225,13 @@ public class CassandraInterpreter extends Interpreter {
       LOGGER.debug("Cassandra Interpreter: Not using SSL");
     }
 
-    cluster = clusterBuilder.build();
-    session = cluster.connect();
+    session = clusterBuilder.withConfigLoader(loader).build();
     helper = new InterpreterLogic(session);
   }
 
   @Override
   public void close() {
     session.close();
-    cluster.close();
   }
 
   @Override
@@ -269,6 +263,6 @@ public class CassandraInterpreter extends Interpreter {
   public Scheduler getScheduler() {
     return SchedulerFactory.singleton()
             .createOrGetParallelScheduler(CassandraInterpreter.class.getName() + this.hashCode(),
-                    parseInt(getProperty(CASSANDRA_INTERPRETER_PARALLELISM)));
+                    parseInt(getProperty(CASSANDRA_INTERPRETER_PARALLELISM, DEFAULT_PARALLELISM)));
   }
 }
diff --git a/cassandra/src/main/resources/interpreter-setting.json b/cassandra/src/main/resources/interpreter-setting.json
index 0f0d58c..f8d2951 100644
--- a/cassandra/src/main/resources/interpreter-setting.json
+++ b/cassandra/src/main/resources/interpreter-setting.json
@@ -64,28 +64,28 @@
         "envName": null,
         "propertyName": "cassandra.load.balancing.policy",
         "defaultValue": "DEFAULT",
-        "description": "Cassandra Load Balancing Policy. Default = new TokenAwarePolicy(new DCAwareRoundRobinPolicy())",
+        "description": "Class name for Load Balancing Policy. Default = DefaultLoadBalancingPolicy",
         "type": "string"
       },
       "cassandra.retry.policy": {
         "envName": null,
         "propertyName": "cassandra.retry.policy",
         "defaultValue": "DEFAULT",
-        "description": "Cassandra Retry Policy. Default = DefaultRetryPolicy.INSTANCE",
+        "description": "Class name for Retry Policy. Default = DefaultRetryPolicy",
         "type": "string"
       },
       "cassandra.reconnection.policy": {
         "envName": null,
         "propertyName": "cassandra.reconnection.policy",
         "defaultValue": "DEFAULT",
-        "description": "Cassandra Reconnection Policy. Default = new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000)",
+        "description": "Class name for Reconnection Policy. Default = ExponentialReconnectionPolicy",
         "type": "string"
       },
       "cassandra.speculative.execution.policy": {
         "envName": null,
         "propertyName": "cassandra.speculative.execution.policy",
         "defaultValue": "DEFAULT",
-        "description": "Cassandra Speculative Execution Policy. Default = NoSpeculativeExecutionPolicy.INSTANCE",
+        "description": "Class name for Speculative Execution Policy. Default = NoSpeculativeExecutionPolicy",
         "type": "string"
       },
       "cassandra.interpreter.parallelism": {
@@ -102,67 +102,25 @@
         "description": "Cassandra max schema agreement wait in second.Default = ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS",
         "type": "number"
       },
-      "cassandra.pooling.new.connection.threshold.local": {
+      "cassandra.pooling.connection.per.host.local": {
         "envName": null,
-        "propertyName": "cassandra.pooling.new.connection.threshold.local",
-        "defaultValue": "100",
-        "description": "Cassandra new connection threshold local. Protocol V2 and below default = 100 Protocol V3 and above default = 800",
-        "type": "number"
-      },
-      "cassandra.pooling.new.connection.threshold.remote": {
-        "envName": null,
-        "propertyName": "cassandra.pooling.new.connection.threshold.remote",
-        "defaultValue": "100",
-        "description": "Cassandra new connection threshold remove. Protocol V2 and below default = 100 Protocol V3 and above default = 200",
-        "type": "number"
-      },
-      "cassandra.pooling.core.connection.per.host.local": {
-        "envName": null,
-        "propertyName": "cassandra.pooling.core.connection.per.host.local",
-        "defaultValue": "2",
-        "description": "Cassandra core connection per host local. Protocol V2 and below default = 2 Protocol V3 and above default = 1",
-        "type": "number"
-      },
-      "cassandra.pooling.core.connection.per.host.remote": {
-        "envName": null,
-        "propertyName": "cassandra.pooling.core.connection.per.host.remote",
-        "defaultValue": "1",
-        "description": "Cassandra core connection per host remove. Protocol V2 and below default = 1 Protocol V3 and above default = 1",
-        "type": "number"
-      },
-      "cassandra.pooling.max.connection.per.host.local": {
-        "envName": null,
-        "propertyName": "cassandra.pooling.max.connection.per.host.local",
+        "propertyName": "cassandra.pooling.connection.per.host.local",
         "defaultValue": "8",
-        "description": "Cassandra max connection per host local. Protocol V2 and below default = 8 Protocol V3 and above default = 1",
+        "description": "Cassandra connections per host local. Protocol V2 and below default = 8 Protocol V3 and above default = 1",
         "type": "number"
       },
-      "cassandra.pooling.max.connection.per.host.remote": {
+      "cassandra.pooling.connection.per.host.remote": {
         "envName": null,
-        "propertyName": "cassandra.pooling.max.connection.per.host.remote",
+        "propertyName": "cassandra.pooling.connection.per.host.remote",
         "defaultValue": "2",
-        "description": "Cassandra max connection per host remote. Protocol V2 and below default = 2 Protocol V3 and above default = 1",
+        "description": "Cassandra connections per host remote. Protocol V2 and below default = 2 Protocol V3 and above default = 1",
         "type": "number"
       },
-      "cassandra.pooling.max.request.per.connection.local": {
+      "cassandra.pooling.max.request.per.connection": {
         "envName": null,
-        "propertyName": "cassandra.pooling.max.request.per.connection.local",
+        "propertyName": "cassandra.pooling.max.request.per.connection",
         "defaultValue": "1024",
-        "description": "Cassandra max request per connection local. Protocol V2 and below default = 128 Protocol V3 and above default = 1024",
-        "type": "number"
-      },
-      "cassandra.pooling.max.request.per.connection.remote": {
-        "envName": null,
-        "propertyName": "cassandra.pooling.max.request.per.connection.remote",
-        "defaultValue": "256",
-        "description": "Cassandra max request per connection remote. Protocol V2 and below default = 128 Protocol V3 and above default = 256",
-        "type": "number"
-      },
-      "cassandra.pooling.idle.timeout.seconds": {
-        "envName": null,
-        "propertyName": "cassandra.pooling.idle.timeout.seconds",
-        "defaultValue": "120",
-        "description": "Cassandra idle time out in seconds. Default = 120",
+        "description": "Cassandra max requests per connection. Protocol V2 and below default = 128 Protocol V3 and above default = 1024",
         "type": "number"
       },
       "cassandra.pooling.pool.timeout.millisecs": {
diff --git a/cassandra/src/main/resources/scalate/allAggregates.ssp b/cassandra/src/main/resources/scalate/allAggregates.ssp
index f093c50..e806d73 100644
--- a/cassandra/src/main/resources/scalate/allAggregates.ssp
+++ b/cassandra/src/main/resources/scalate/allAggregates.ssp
@@ -19,7 +19,7 @@
 
 #import(org.apache.zeppelin.cassandra.MetaDataHierarchy._)
 #import(java.util.UUID)
-<%@ val allAggregates: Map[(UUID, String), List[AggregateSummary]] %>
+<%@ val allAggregates: Map[(UUID, String), Seq[AggregateSummary]] %>
 
 <div class="container">
 
diff --git a/cassandra/src/main/resources/scalate/allFunctions.ssp b/cassandra/src/main/resources/scalate/allFunctions.ssp
index fc756a4..e026687 100644
--- a/cassandra/src/main/resources/scalate/allFunctions.ssp
+++ b/cassandra/src/main/resources/scalate/allFunctions.ssp
@@ -19,7 +19,7 @@
 
 #import(org.apache.zeppelin.cassandra.MetaDataHierarchy._)
 #import(java.util.UUID)
-<%@ val allFunctions: Map[(UUID, String), List[FunctionSummary]] %>
+<%@ val allFunctions: Map[(UUID, String), Seq[FunctionSummary]] %>
 
 <div class="container">
 
diff --git a/cassandra/src/main/resources/scalate/allMaterializedViews.ssp b/cassandra/src/main/resources/scalate/allMaterializedViews.ssp
index 8ffee57..58efdb4 100644
--- a/cassandra/src/main/resources/scalate/allMaterializedViews.ssp
+++ b/cassandra/src/main/resources/scalate/allMaterializedViews.ssp
@@ -18,7 +18,7 @@
 --%>
 #import(org.apache.zeppelin.cassandra.MetaDataHierarchy._)
 #import(java.util.UUID)
-<%@ val allMVs: Map[(UUID,String),List[MaterializedViewSummary]] %>
+<%@ val allMVs: Map[(UUID,String),Seq[MaterializedViewSummary]] %>
 <div class="container">
 
     <div class="row">
diff --git a/cassandra/src/main/resources/scalate/allTables.ssp b/cassandra/src/main/resources/scalate/allTables.ssp
index 810881b..0464325 100644
--- a/cassandra/src/main/resources/scalate/allTables.ssp
+++ b/cassandra/src/main/resources/scalate/allTables.ssp
@@ -18,7 +18,7 @@
 --%>
 #import(org.apache.zeppelin.cassandra.MetaDataHierarchy._)
 #import(java.util.UUID)
-<%@ val allTables: Map[(UUID,String),List[String]] %>
+<%@ val allTables: Map[(UUID,String),Seq[String]] %>
 <div class="container">
 
     <div class="row">
diff --git a/cassandra/src/main/resources/scalate/allUDTs.ssp b/cassandra/src/main/resources/scalate/allUDTs.ssp
index 559ef41..0fa40ee 100644
--- a/cassandra/src/main/resources/scalate/allUDTs.ssp
+++ b/cassandra/src/main/resources/scalate/allUDTs.ssp
@@ -18,7 +18,7 @@
 --%>
 #import(org.apache.zeppelin.cassandra.MetaDataHierarchy._)
 #import(java.util.UUID)
-<%@ val allUDTs: Map[(UUID,String),List[String]] %>
+<%@ val allUDTs: Map[(UUID,String),Seq[String]] %>
 <div class="container">
 
     <div class="row">
diff --git a/cassandra/src/main/resources/scalate/helpMenu.ssp b/cassandra/src/main/resources/scalate/helpMenu.ssp
index c61a68f..c40c81a 100644
--- a/cassandra/src/main/resources/scalate/helpMenu.ssp
+++ b/cassandra/src/main/resources/scalate/helpMenu.ssp
@@ -19,17 +19,17 @@
 
 #import(java.util.UUID)
 
-#import(com.datastax.driver.core.utils.UUIDs)
-
-<%@ val basicCommandsId: UUID = UUIDs.random() %>
-<%@ val schemaDiscoveryId: UUID = UUIDs.random() %>
-<%@ val queryParamsId: UUID = UUIDs.random() %>
-<%@ val preparedStatementsId: UUID = UUIDs.random() %>
-<%@ val dynamicFormsId: UUID = UUIDs.random() %>
-<%@ val configurationId: UUID = UUIDs.random() %>
-<%@ val sharedStatesId: UUID = UUIDs.random() %>
-<%@ val changelogId: UUID = UUIDs.random() %>
-<%@ val contactsId: UUID = UUIDs.random() %>
+#import(com.datastax.oss.driver.api.core.uuid.Uuids)
+
+<%@ val basicCommandsId: UUID = Uuids.random() %>
+<%@ val schemaDiscoveryId: UUID = Uuids.random() %>
+<%@ val queryParamsId: UUID = Uuids.random() %>
+<%@ val preparedStatementsId: UUID = Uuids.random() %>
+<%@ val dynamicFormsId: UUID = Uuids.random() %>
+<%@ val configurationId: UUID = Uuids.random() %>
+<%@ val sharedStatesId: UUID = Uuids.random() %>
+<%@ val changelogId: UUID = Uuids.random() %>
+<%@ val contactsId: UUID = Uuids.random() %>
 
 <br/>
 <br/>
@@ -463,11 +463,6 @@
                                     </td>
                                 </tr>
                                 <tr>
-                                    <td>Retry Policy</td>
-                                    <td><strong>@retryPolicy=<em>value</em></strong></td>
-                                    <td>Apply the given retry policy to all queries in the paragraph</td>
-                                </tr>
-                                <tr>
                                     <td>Fetch Size</td>
                                     <td><strong>@fetchSize=<em>int value</em></strong></td>
                                     <td>Apply the given fetch size to all queries in the paragraph</td>
@@ -507,15 +502,6 @@
                                     <td>Any long value</td>
                                 </tr>
                                 <tr>
-                                    <td>Retry Policy</td>
-                                    <td>
-                                        <strong>
-                                            DEFAULT, DOWNGRADING_CONSISTENCY, FALLTHROUGH, LOGGING_DEFAULT,
-                                            LOGGING_DOWNGRADING, LOGGING_FALLTHROUGH
-                                        </strong>
-                                    </td>
-                                </tr>
-                                <tr>
                                     <td>Fetch Size</td>
                                     <td>Any integer value</td>
                                 </tr>
diff --git a/cassandra/src/main/resources/scalate/materializedViewDetails.ssp b/cassandra/src/main/resources/scalate/materializedViewDetails.ssp
index 459b5bc8..182776a 100644
--- a/cassandra/src/main/resources/scalate/materializedViewDetails.ssp
+++ b/cassandra/src/main/resources/scalate/materializedViewDetails.ssp
@@ -52,7 +52,7 @@
                             <i class="glyphicon glyphicon-fullscreen" title="Partition Key"/>
                         </td>
                         <td class="col-md-4">${column.name}</td>
-                        <td class="col-md-4">${column.dataType}</td>
+                        <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
                     </tr>
                         #case(StaticColumn)
                     <tr class="warning">
@@ -60,7 +60,7 @@
                             <i class="glyphicon glyphicon-pushpin" title="Static Column"/>
                         </td>
                         <td class="col-md-4">${column.name}</td>
-                        <td class="col-md-4">${column.dataType}</td>
+                        <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
                     </tr>
                         #case(ClusteringColumn(ASC))
                     <tr class="success">
@@ -70,7 +70,7 @@
                             <i class="glyphicon glyphicon-sort-by-attributes" title="Sort ASC"/>
                         </td>
                         <td class="col-md-4">${column.name}</td>
-                        <td class="col-md-4">${column.dataType}</td>
+                        <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
                     </tr>
                         #case(ClusteringColumn(DESC))
                     <tr class="success">
@@ -80,13 +80,13 @@
                             <i class="glyphicon glyphicon-sort-by-attributes-alt" title="Sort DESC"/>
                         </td>
                         <td class="col-md-4">${column.name}</td>
-                        <td class="col-md-4">${column.dataType}</td>
+                        <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
                     </tr>
                         #otherwise
                     <tr>
                         <td class="col-md-4"></td>
                         <td class="col-md-4">${column.name}</td>
-                        <td class="col-md-4">${column.dataType}</td>
+                        <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
                     </tr>
                         #end
                     #end
diff --git a/cassandra/src/main/resources/scalate/noResultWithExecutionInfo.ssp b/cassandra/src/main/resources/scalate/noResultWithExecutionInfo.ssp
index 1f709a4..465558a 100644
--- a/cassandra/src/main/resources/scalate/noResultWithExecutionInfo.ssp
+++ b/cassandra/src/main/resources/scalate/noResultWithExecutionInfo.ssp
@@ -17,7 +17,6 @@
 */
 --%>
 <%@ val query: String%>
-<%@ val consistency: String%>
 <%@ val triedHosts: String%>
 <%@ val queriedHosts: String%>
 <%@ val schemaInAgreement: String%>
@@ -43,10 +42,6 @@
                     <td>${query}</td>
                 </tr>
                 <tr>
-                    <td>Achieved Consistency</td>
-                    <td>${consistency}</td>
-                </tr>
-                <tr>
                     <td>Tried Hosts</td>
                     <td>${triedHosts}</td>
                 </tr>
@@ -55,7 +50,7 @@
                     <td>${queriedHosts}</td>
                 </tr>
                 <tr>
-                    <td>Schema In Agreement</td>
+                    <td>Schema in Agreement</td>
                     <td>${schemaInAgreement}</td>
                 </tr>
             </tbody>
diff --git a/cassandra/src/main/resources/scalate/tableDetails.ssp b/cassandra/src/main/resources/scalate/tableDetails.ssp
index 41e4517..0938c8e 100644
--- a/cassandra/src/main/resources/scalate/tableDetails.ssp
+++ b/cassandra/src/main/resources/scalate/tableDetails.ssp
@@ -44,7 +44,7 @@
                             <i class="glyphicon glyphicon-fullscreen" title="Partition Key"/>
                         </td>
                         <td class="col-md-4">${column.name}</td>
-                        <td class="col-md-4">${column.dataType}</td>
+                        <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
                     </tr>
                         #case(StaticColumn)
                     <tr class="warning">
@@ -52,7 +52,7 @@
                             <i class="glyphicon glyphicon-pushpin" title="Static Column"/>
                         </td>
                         <td class="col-md-4">${column.name}</td>
-                        <td class="col-md-4">${column.dataType}</td>
+                        <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
                     </tr>
                         #case(ClusteringColumn(ASC))
                     <tr class="success">
@@ -62,7 +62,7 @@
                             <i class="glyphicon glyphicon-sort-by-attributes" title="Sort ASC"/>
                         </td>
                         <td class="col-md-4">${column.name}</td>
-                        <td class="col-md-4">${column.dataType}</td>
+                        <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
                     </tr>
                         #case(ClusteringColumn(DESC))
                     <tr class="success">
@@ -72,13 +72,13 @@
                             <i class="glyphicon glyphicon-sort-by-attributes-alt" title="Sort DESC"/>
                         </td>
                         <td class="col-md-4">${column.name}</td>
-                        <td class="col-md-4">${column.dataType}</td>
+                        <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
                     </tr>
                         #otherwise
                     <tr>
                         <td class="col-md-4"></td>
                         <td class="col-md-4">${column.name}</td>
-                        <td class="col-md-4">${column.dataType}</td>
+                        <td class="col-md-4">${column.dataType.asCql(true,true)}</td>
                     </tr>
                         #end
                     #end
diff --git a/cassandra/src/main/resources/scalate/udtDetails.ssp b/cassandra/src/main/resources/scalate/udtDetails.ssp
index 28858f2..804033c 100644
--- a/cassandra/src/main/resources/scalate/udtDetails.ssp
+++ b/cassandra/src/main/resources/scalate/udtDetails.ssp
@@ -39,7 +39,7 @@
 
                 <tr>
                     <td class="col-md-6">${column.name}</td>
-                    <td class="col-md-6">${column.dataType}</td>
+                    <td class="col-md-6">${column.dataType.asCql(true,true)}</td>
                 </tr>
                 #end
 
diff --git a/cassandra/src/main/scala/com/datastax/driver/core/TableMetadataWrapper.scala b/cassandra/src/main/scala/com/datastax/driver/core/TableMetadataWrapper.scala
index feddbe1..489b6fa 100644
--- a/cassandra/src/main/scala/com/datastax/driver/core/TableMetadataWrapper.scala
+++ b/cassandra/src/main/scala/com/datastax/driver/core/TableMetadataWrapper.scala
@@ -16,8 +16,10 @@
  */
 package com.datastax.driver.core
 
+import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata
+
 case class TableMetadataWrapper(val meta: TableMetadata) {
     def exportTableOnlyAsString(): String = {
-      meta.asCQLQuery(true)
+      meta.describe(true)
     }
 }
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/BoundValuesParser.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/BoundValuesParser.scala
index 3268650..5ebc507 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/BoundValuesParser.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/BoundValuesParser.scala
@@ -16,18 +16,15 @@
  */
 package org.apache.zeppelin.cassandra
 
-import java.text.SimpleDateFormat
-import java.util.{Date}
-
+import scala.util.matching.Regex
 import scala.util.parsing.combinator._
 
 /**
  * Parser of bound values passed into @bind parameters
  */
 class BoundValuesParser extends RegexParsers with JavaTokenParsers {
-
-  val STANDARD_DATE_PATTERN = """(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})""".r
-  val ACCURATE_DATE_PATTERN = """(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})""".r
+  val STANDARD_DATE_PATTERN: Regex  = """(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})""".r
+  val ACCURATE_DATE_PATTERN: Regex = """(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})""".r
 
   def value : Parser[String] = "null" | "true" | "false" | zeppelinVariable |
     map | list | set | tuple| udt |
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/DisplaySystem.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/DisplaySystem.scala
index 135d4cd..382bd46 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/DisplaySystem.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/DisplaySystem.scala
@@ -18,13 +18,19 @@ package org.apache.zeppelin.cassandra
 
 import java.util.UUID
 
-import com.datastax.driver.core.utils.UUIDs
 import org.apache.zeppelin.cassandra.MetaDataHierarchy._
 import org.fusesource.scalate.TemplateEngine
 
 import scala.collection.JavaConverters._
-
+import scala.compat.java8.OptionConverters._
 import com.datastax.driver.core._
+import com.datastax.oss.driver.api.core.CqlIdentifier
+import com.datastax.oss.driver.api.core.`type`.UserDefinedType
+import com.datastax.oss.driver.api.core.`type`.codec.registry.CodecRegistry
+import com.datastax.oss.driver.api.core.cql.ExecutionInfo
+import com.datastax.oss.driver.api.core.metadata.{Metadata, Node}
+import com.datastax.oss.driver.api.core.metadata.schema.{AggregateMetadata, ColumnMetadata, FunctionMetadata, FunctionSignature, IndexMetadata, KeyspaceMetadata, RelationMetadata, TableMetadata, ViewMetadata}
+import com.datastax.oss.driver.api.core.uuid.Uuids
 
 import scala.collection.immutable.ListMap
 
@@ -61,7 +67,14 @@ object DisplaySystem {
   val CLUSTER_CONTENT_TEMPLATE = "scalate/clusterContent.ssp"
   val KEYSPACE_CONTENT_TEMPLATE = "scalate/keyspaceContent.ssp"
 
-
+  val DEFAULT_CLUSTER_NAME = "Test Cluster"
+  def clusterName(meta: Metadata): String = {
+    if (meta != null) {
+      meta.getClusterName.orElse(DEFAULT_CLUSTER_NAME)
+    } else {
+      DEFAULT_CLUSTER_NAME
+    }
+  }
 
   object TableDisplay {
 
@@ -70,60 +83,61 @@ object DisplaySystem {
     }
 
     protected[DisplaySystem] def formatWithoutMenu(meta: TableMetadata, withCaption: Boolean): String = {
-      val tableName: String = meta.getName
+      val tableName: String = meta.getName.asCql(true)
       val columnsDetails = MetaDataConverter.Table.tableMetaToColumnDetails(meta)
       val indicesDetails = MetaDataConverter.Table.tableMetaToIndexDetails(meta)
       val indicesAsCQL = indicesDetails.map(_.asCQL).mkString("\n")
 
       engine.layout(TABLE_DETAILS_TEMPLATE,
-        Map[String, Any]("tableDetails" -> TableDetails(tableName, columnsDetails, indicesDetails, TableMetadataWrapper(meta).exportTableOnlyAsString(), indicesAsCQL), "withCaption" -> withCaption))
+        Map[String, Any]("tableDetails" -> TableDetails(tableName, columnsDetails, indicesDetails,
+          TableMetadataWrapper(meta).exportTableOnlyAsString(), indicesAsCQL), "withCaption" -> withCaption))
     }
   }
 
   object UDTDisplay {
-    def format(statement: String, userType: UserType, withCaption: Boolean): String = {
+    def format(statement: String, userType: UserDefinedType, withCaption: Boolean): String = {
       MenuDisplay.formatMenu(statement) ++ formatWithoutMenu(userType, withCaption)
     }
 
-    protected[DisplaySystem] def formatWithoutMenu(userType: UserType, withCaption: Boolean): String = {
-      val udtName: String = userType.getTypeName
+    protected[DisplaySystem] def formatWithoutMenu(userType: UserDefinedType, withCaption: Boolean): String = {
+      val udtName: String = userType.getName.asCql(true)
       val columnsDetails = MetaDataConverter.UDT.userTypeToColumnDetails(userType)
 
       engine.layout(UDT_DETAILS_TEMPLATE,
-        Map[String, Any]("udtDetails" -> UDTDetails(udtName,columnsDetails,userType.exportAsString()), "withCaption" -> withCaption))
+        Map[String, Any]("udtDetails" -> UDTDetails(udtName, columnsDetails, userType.describe(true)), "withCaption" -> withCaption))
     }
   }
 
   object FunctionDisplay {
-    def format(statement: String, functions: List[FunctionMetadata], withCaption: Boolean): String = {
+    def format(statement: String, functions: Seq[FunctionMetadata], withCaption: Boolean): String = {
       MenuDisplay.formatMenu(statement) ++ formatWithoutMenu(functions, withCaption)
     }
 
-    protected[DisplaySystem] def formatWithoutMenu(functions: List[FunctionMetadata], withCaption: Boolean): String = {
-      val functionDetails: List[FunctionDetails] = functions.map(MetaDataConverter.functionMetaToFunctionDetails(_))
+    protected[DisplaySystem] def formatWithoutMenu(functions: Seq[FunctionMetadata], withCaption: Boolean): String = {
+      val functionDetails: Seq[FunctionDetails] = functions.map(MetaDataConverter.functionMetaToFunctionDetails)
       engine.layout(FUNCTION_DETAILS_TEMPLATE,
         Map[String, Any]("sameNameFunctionDetails" -> SameNameFunctionDetails(functionDetails), "withCaption" -> withCaption))
     }
   }
 
   object AggregateDisplay {
-    def format(statement: String, aggregates: List[AggregateMetadata], withCaption: Boolean, codecRegistry: CodecRegistry): String = {
+    def format(statement: String, aggregates: Seq[AggregateMetadata], withCaption: Boolean, codecRegistry: CodecRegistry): String = {
       MenuDisplay.formatMenu(statement) ++ formatWithoutMenu(aggregates, withCaption, codecRegistry)
     }
 
-    protected[DisplaySystem] def formatWithoutMenu(aggregates: List[AggregateMetadata], withCaption: Boolean, codecRegistry: CodecRegistry): String = {
-      val aggDetails: List[AggregateDetails] = aggregates.map(agg => MetaDataConverter.aggregateMetaToAggregateDetails(codecRegistry, agg))
+    protected[DisplaySystem] def formatWithoutMenu(aggregates: Seq[AggregateMetadata], withCaption: Boolean, codecRegistry: CodecRegistry): String = {
+      val aggDetails: Seq[AggregateDetails] = aggregates.map(agg => MetaDataConverter.aggregateMetaToAggregateDetails(codecRegistry, agg))
       engine.layout(AGGREGATE_DETAILS_TEMPLATE,
         Map[String, Any]("sameNameAggregateDetails" -> SameNameAggregateDetails(aggDetails), "withCaption" -> withCaption))
     }
   }
 
   object MaterializedViewDisplay {
-    def format(statement: String, mv: MaterializedViewMetadata, withCaption: Boolean): String = {
+    def format(statement: String, mv: ViewMetadata, withCaption: Boolean): String = {
       MenuDisplay.formatMenu(statement) ++ formatWithoutMenu(mv, withCaption)
     }
 
-    protected[DisplaySystem] def formatWithoutMenu(mv: MaterializedViewMetadata, withCaption: Boolean): String = {
+    protected[DisplaySystem] def formatWithoutMenu(mv: ViewMetadata, withCaption: Boolean): String = {
       val mvDetails = MetaDataConverter.mvMetaToMaterializedViewDetails(mv)
       engine.layout(MATERIALIZED_VIEW_DETAILS_TEMPLATE,
         Map[String, Any]("mvDetails" -> mvDetails, "withCaption" -> withCaption))
@@ -133,44 +147,49 @@ object DisplaySystem {
   object KeyspaceDisplay {
 
     private def formatCQLQuery(cql: String): String = {
-      cql.replaceAll(""" WITH REPLICATION = \{"""," WITH REPLICATION = \\{")
-        .replaceAll("('[^']+'\\s*:\\s+'[^']+',?)","\n\t$1")
-        .replaceAll(""" \} AND DURABLE_WRITES = """," \\}\nAND DURABLE_WRITES = ")
+      cql.replaceAll(""" WITH REPLICATION = \{""", " WITH REPLICATION = \\{")
+        .replaceAll("('[^']+'\\s*:\\s+'[^']+',?)", "\n\t$1")
+        .replaceAll(""" \} AND DURABLE_WRITES = """, " \\}\nAND DURABLE_WRITES = ")
     }
 
     protected[cassandra] def formatKeyspaceOnly(meta: KeyspaceMetadata, withCaption: Boolean): String = {
-      val ksDetails = KeyspaceDetails(meta.getName,
+      val ksDetails = KeyspaceDetails(meta.getName.asCql(true),
         meta.getReplication.asScala.toMap,
         meta.isDurableWrites,
-        formatCQLQuery(meta.asCQLQuery()))
+        formatCQLQuery(meta.describe(true)))
 
       engine.layout(KEYSPACE_DETAILS_TEMPLATE,
         Map[String, Any]("ksDetails" -> ksDetails, "withCaption" -> withCaption))
     }
 
     def formatKeyspaceContent(statement: String, meta: KeyspaceMetadata, codecRegistry: CodecRegistry): String = {
-      val ksName: String = meta.getName
-      val ksDetails = formatKeyspaceOnly(meta, true)
-
-      val tableDetails: List[(UUID, String, String)] = meta.getTables.asScala.toList
-        .sortBy(_.getName)
-        .map(table => (UUIDs.timeBased(), table.getName, TableDisplay.formatWithoutMenu(table, false)))
-
-      val viewDetails: List[(UUID, String, String)] = meta.getMaterializedViews.asScala.toList
-        .sortBy(_.getName)
-        .map(view => (UUIDs.timeBased(), view.getName, MaterializedViewDisplay.formatWithoutMenu(view, false)))
-
-      val udtDetails: List[(UUID, String, String)] = meta.getUserTypes.asScala.toList
-        .sortBy(_.getTypeName)
-        .map(udt => (UUIDs.timeBased(), udt.getTypeName, UDTDisplay.formatWithoutMenu(udt, false)))
-
-      val functionDetails: List[(UUID, String, String)] = meta.getFunctions.asScala.toList
-        .sortBy(_.getSimpleName)
-        .map(function => (UUIDs.timeBased(), function.getSimpleName, FunctionDisplay.formatWithoutMenu(List(function), false)))
-
-      val aggregateDetails: List[(UUID, String, String)] = meta.getAggregates.asScala.toList
-        .sortBy(_.getSimpleName)
-        .map(agg => (UUIDs.timeBased(), agg.getSimpleName, AggregateDisplay.formatWithoutMenu(List(agg), false, codecRegistry)))
+      val ksName: String = meta.getName.asCql(true)
+      val ksDetails = formatKeyspaceOnly(meta, withCaption = true)
+
+      val tableDetails: Seq[(UUID, String, String)] = meta.getTables.asScala.toSeq
+        .sortBy(_._1.asCql(true))
+        .map(t => (Uuids.timeBased(), t._1.asCql(true),
+          TableDisplay.formatWithoutMenu(t._2, withCaption = false)))
+
+      val viewDetails: Seq[(UUID, String, String)] = meta.getViews.asScala.toSeq
+        .sortBy(_._1.asCql(true))
+        .map(v => (Uuids.timeBased(), v._1.asCql(true),
+          MaterializedViewDisplay.formatWithoutMenu(v._2, withCaption = false)))
+
+      val udtDetails: Seq[(UUID, String, String)] = meta.getUserDefinedTypes.asScala.toSeq
+        .sortBy(_._1.asCql(true))
+        .map(udt => (Uuids.timeBased(), udt._1.asCql(true),
+          UDTDisplay.formatWithoutMenu(udt._2, withCaption = false)))
+
+      val functionDetails: Seq[(UUID, String, String)] = meta.getFunctions.asScala.toSeq
+        .sortBy(_._1.getName.asCql(true))
+        .map(f => (Uuids.timeBased(), f._1.getName.asCql(true), FunctionDisplay.formatWithoutMenu(List(f._2),
+          withCaption = false)))
+
+      val aggregateDetails: Seq[(UUID, String, String)] = meta.getAggregates.asScala.toSeq
+        .sortBy(_._1.getName.asCql(true))
+        .map(a => (Uuids.timeBased(), a._1.getName.asCql(true), AggregateDisplay.formatWithoutMenu(List(a._2),
+          withCaption = false, codecRegistry)))
 
       val ksContent: KeyspaceContent = KeyspaceContent(ksName, ksDetails, tableDetails, viewDetails,
         udtDetails, functionDetails, aggregateDetails)
@@ -181,25 +200,33 @@ object DisplaySystem {
     }
   }
 
+  // cluster is explicitly passed because of the limitations of the driver.
+  // TODO(alex): remove it after driver is fixed
   object ClusterDisplay {
 
-    def formatClusterOnly(statement: String, meta: Metadata, withMenu: Boolean = true): String = {
-      val clusterDetails: ClusterDetails = ClusterDetails(meta.getClusterName, meta.getPartitioner)
+
+    def formatClusterOnly(statement: String, meta: Metadata,
+                          withMenu: Boolean = true): String = {
+      val partitioner: String = if (meta.getTokenMap.isPresent)
+        meta.getTokenMap.get().getPartitionerName
+      else
+        "Unknown partitioner"
+      val clusterDetails: ClusterDetails = ClusterDetails(clusterName(meta), partitioner)
       val content: String = engine.layout(CLUSTER_DETAILS_TEMPLATE,
         Map[String, Any]("clusterDetails" -> clusterDetails))
 
-      if(withMenu) MenuDisplay.formatMenu(statement) + content else content
+      if (withMenu) MenuDisplay.formatMenu(statement) + content else content
     }
 
     def formatClusterContent(statement: String, meta: Metadata): String = {
-      val clusterName: String = meta.getClusterName
-      val clusterDetails: String = formatClusterOnly(statement, meta, false)
+      val clusterDetails: String = formatClusterOnly(statement, meta, withMenu = false)
 
-      val keyspaceDetails: List[(UUID, String, String)] = meta.getKeyspaces.asScala.toList
-        .sortBy(ks => ks.getName)
-        .map(ks => (UUIDs.timeBased(), ks.getName, KeyspaceDisplay.formatKeyspaceOnly(ks, false)))
+      val keyspaceDetails: Seq[(UUID, String, String)] = meta.getKeyspaces.asScala.toSeq
+        .sortBy(_._1.asCql(true))
+        .map(ks => (Uuids.timeBased(), ks._1.asCql(true),
+          KeyspaceDisplay.formatKeyspaceOnly(ks._2, withCaption = false)))
 
-      val clusterContent: ClusterContent = ClusterContent(clusterName, clusterDetails, keyspaceDetails)
+      val clusterContent: ClusterContent = ClusterContent(clusterName(meta), clusterDetails, keyspaceDetails)
 
       MenuDisplay.formatMenuForCluster(statement, clusterContent) +
         engine.layout(CLUSTER_CONTENT_TEMPLATE,
@@ -207,26 +234,26 @@ object DisplaySystem {
     }
 
     def formatAllTables(statement: String, meta: Metadata): String = {
-      val ksMetas: List[KeyspaceMetadata] = meta.getKeyspaces.asScala.toList
-        .filter(_.getTables.size > 0)
-        .sortBy(ks => ks.getName)
-      if(ksMetas.isEmpty) {
+      val ksMetas: Seq[KeyspaceMetadata] = meta.getKeyspaces.asScala.toSeq
+        .filter(_._2.getTables.size > 0)
+        .sortBy(_._1.asCql(true))
+        .map(_._2)
+      if (ksMetas.isEmpty) {
         NoResultDisplay.formatNoResult
       } else {
-        val allTables: Map[(UUID, String), List[String]] = ListMap.empty ++
+        val allTables: Map[(UUID, String), Seq[String]] = ListMap.empty ++
           ksMetas
             .map(ks => {
-              ((UUIDs.timeBased(), ks.getName),
-                ks.getTables.asScala.toList.map(table => table.getName).sortBy(name => name))
+              ((Uuids.timeBased(), ks.getName.asCql(true)),
+                ks.getTables.asScala.toList.map(_._1.asCql(true)).sorted)
             })
-            .sortBy{case ((id,name), _) => name}
+            .sortBy { case ((_, name), _) => name }
 
+        val keyspaceDetails: Seq[(UUID, String, String)] = allTables
+          .keySet.toSeq.sortBy { case (_, ksName) => ksName }
+          .map { case (id, ksName) => (id, ksName, "") }
 
-        val keyspaceDetails: List[(UUID, String, String)] = allTables
-          .keySet.toList.sortBy{case(id,ksName) => ksName}
-          .map{case(id,ksName) => (id,ksName, "")}
-
-        val clusterContent: ClusterContent = ClusterContent(meta.getClusterName, "", keyspaceDetails)
+        val clusterContent: ClusterContent = ClusterContent(clusterName(meta), "", keyspaceDetails)
 
         MenuDisplay.formatMenuForCluster(statement, clusterContent) +
           engine.layout(ALL_TABLES_TEMPLATE,
@@ -235,27 +262,27 @@ object DisplaySystem {
     }
 
     def formatAllUDTs(statement: String, meta: Metadata): String = {
-      val ksMetas: List[KeyspaceMetadata] = meta.getKeyspaces.asScala.toList
-        .filter(_.getUserTypes.size > 0)
-        .sortBy(ks => ks.getName)
+      val ksMetas: Seq[KeyspaceMetadata] = meta.getKeyspaces.asScala.toSeq
+        .filter(_._2.getUserDefinedTypes.size > 0)
+        .sortBy(_._1.asCql(false))
+        .map(_._2)
 
-      if(ksMetas.isEmpty) {
+      if (ksMetas.isEmpty) {
         NoResultDisplay.formatNoResult
       } else {
-        val allUDTs: Map[(UUID, String), List[String]] = ListMap.empty ++
+        val allUDTs: Map[(UUID, String), Seq[String]] = ListMap.empty ++
           ksMetas
-            .map(ks => {
-              ((UUIDs.timeBased(), ks.getName),
-                ks.getUserTypes.asScala.toList.map(udt => udt.getTypeName).sortBy(name => name))
+            .map ( ks => {
+              ((Uuids.timeBased(), ks.getName.asCql(true)),
+                ks.getUserDefinedTypes.asScala.toSeq.map(_._1.asCql(true)).sorted)
             })
-            .sortBy { case ((id, name), _) => name }
-
+            .sortBy { case ((_, name), _) => name }
 
         val keyspaceDetails: List[(UUID, String, String)] = allUDTs
-          .keySet.toList.sortBy { case (id, ksName) => ksName }
+          .keySet.toList.sortBy { case (_, ksName) => ksName }
           .map { case (id, ksName) => (id, ksName, "") }
 
-        val clusterContent: ClusterContent = ClusterContent(meta.getClusterName, "", keyspaceDetails)
+        val clusterContent: ClusterContent = ClusterContent(clusterName(meta), "", keyspaceDetails)
 
         MenuDisplay.formatMenuForCluster(statement, clusterContent) +
           engine.layout(ALL_UDTS_TEMPLATE,
@@ -264,29 +291,29 @@ object DisplaySystem {
     }
 
     def formatAllFunctions(statement: String, meta: Metadata): String = {
-      val ksMetas: List[KeyspaceMetadata] = meta.getKeyspaces.asScala.toList
-        .filter(_.getFunctions.size > 0)
-        .sortBy(ks => ks.getName)
+      val ksMetas: Seq[KeyspaceMetadata] = meta.getKeyspaces.asScala.toSeq
+        .filter(_._2.getFunctions.size > 0)
+        .sortBy(_._1.asCql(true))
+        .map(_._2)
 
-      if(ksMetas.isEmpty) {
+      if (ksMetas.isEmpty) {
         NoResultDisplay.formatNoResult
       } else {
-        val allFunctions: Map[(UUID, String), List[FunctionSummary]] = ListMap.empty ++
+        val allFunctions: Map[(UUID, String), Seq[FunctionSummary]] =
           ksMetas
             .map(ks => {
-              ((UUIDs.timeBased(), ks.getName),
-                ks.getFunctions.asScala.toList
-                  .map(MetaDataConverter.functionMetaToFunctionSummary(_))
+              ((Uuids.timeBased(), ks.getName.asCql(true)),
+                ks.getFunctions.asScala
+                  .map { case (sig, meta) => MetaDataConverter.functionMetaToFunctionSummary(sig, meta) }
+                  .toSeq
                   .sortBy(_.name))
-            })
-            .sortBy { case ((id, name), _) => name }
+            }).toMap
 
         val keyspaceDetails: List[(UUID, String, String)] = allFunctions
-          .keySet.toList.sortBy { case (id, ksName) => ksName }
+          .keySet.toList.sortBy { case (_, ksName) => ksName }
           .map { case (id, ksName) => (id, ksName, "") }
 
-
-        val clusterContent: ClusterContent = ClusterContent(meta.getClusterName, "", keyspaceDetails)
+        val clusterContent: ClusterContent = ClusterContent(clusterName(meta), "", keyspaceDetails)
 
         MenuDisplay.formatMenuForCluster(statement, clusterContent) +
           engine.layout(ALL_FUNCTIONS_TEMPLATE,
@@ -295,29 +322,28 @@ object DisplaySystem {
     }
 
     def formatAllAggregates(statement: String, meta: Metadata): String = {
-      val ksMetas: List[KeyspaceMetadata] = meta.getKeyspaces.asScala.toList
-        .filter(_.getAggregates.size > 0)
-        .sortBy(ks => ks.getName)
+      val ksMetas = meta.getKeyspaces.asScala.toList
+        .filter(_._2.getAggregates.size > 0)
+        .sortBy(_._1.asCql(false))
 
-      if(ksMetas.isEmpty) {
+      if (ksMetas.isEmpty) {
         NoResultDisplay.formatNoResult
       } else {
-        val allAggregates: Map[(UUID, String), List[AggregateSummary]] = ListMap.empty ++
+        val allAggregates: Map[(UUID, String), Seq[AggregateSummary]] =
           ksMetas
-            .map(ks => {
-              ((UUIDs.timeBased(), ks.getName),
-                ks.getAggregates.asScala.toList
-                  .map(MetaDataConverter.aggregateMetaToAggregateSummary(_))
+            .map { case (id, ks) =>
+              ((Uuids.timeBased(), id.asCql(true)),
+                ks.getAggregates.asScala
+                  .map { case (sig, meta) => MetaDataConverter.aggregateMetaToAggregateSummary(sig, meta) }
+                  .toSeq
                   .sortBy(_.name))
-            })
-            .sortBy { case ((id, name), _) => name }
+            }.toMap
 
         val keyspaceDetails: List[(UUID, String, String)] = allAggregates
           .keySet.toList.sortBy { case (id, ksName) => ksName }
           .map { case (id, ksName) => (id, ksName, "") }
 
-
-        val clusterContent: ClusterContent = ClusterContent(meta.getClusterName, "", keyspaceDetails)
+        val clusterContent: ClusterContent = ClusterContent(clusterName(meta), "", keyspaceDetails)
 
         MenuDisplay.formatMenuForCluster(statement, clusterContent) +
           engine.layout(ALL_AGGREGATES_TEMPLATE,
@@ -326,29 +352,28 @@ object DisplaySystem {
     }
 
     def formatAllMaterializedViews(statement: String, meta: Metadata): String = {
-      val ksMetas: List[KeyspaceMetadata] = meta.getKeyspaces.asScala.toList
-        .filter(_.getMaterializedViews.size > 0)
-        .sortBy(ks => ks.getName)
+      val ksMetas: Seq[KeyspaceMetadata] = meta.getKeyspaces.asScala.toSeq
+        .filter(_._2.getViews.size > 0)
+        .sortBy(_._1.asCql(false))
+        .map(_._2)
 
-      if(ksMetas.isEmpty) {
+      if (ksMetas.isEmpty) {
         NoResultDisplay.formatNoResult
       } else {
-        val allMVs: Map[(UUID, String), List[MaterializedViewSummary]] = ListMap.empty ++
+        val allMVs: Map[(UUID, String), Seq[MaterializedViewSummary]] = ListMap.empty ++
           ksMetas
-            .map(ks => {
-              ((UUIDs.timeBased(), ks.getName),
-                ks.getMaterializedViews.asScala.toList
-                  .map(MetaDataConverter.mvMetaToMaterializedViewSummary(_))
-                  .sortBy(_.name))
-            })
-            .sortBy { case ((id, name), _) => name }
-
-        val keyspaceDetails: List[(UUID, String, String)] = allMVs
-          .keySet.toList.sortBy { case (id, ksName) => ksName }
+            .map( ks =>
+              ((Uuids.timeBased(), ks.getName.asCql(true)),
+                ks.getViews.asScala.values.toSeq
+                  .map(MetaDataConverter.mvMetaToMaterializedViewSummary)
+                  .sortBy(_.name)))
+            .sortBy { case ((_, name), _) => name }
+
+        val keyspaceDetails: Seq[(UUID, String, String)] = allMVs
+          .keySet.toList.sortBy { case (_, ksName) => ksName }
           .map { case (id, ksName) => (id, ksName, "") }
 
-
-        val clusterContent: ClusterContent = ClusterContent(meta.getClusterName, "", keyspaceDetails)
+        val clusterContent: ClusterContent = ClusterContent(clusterName(meta), "", keyspaceDetails)
 
         MenuDisplay.formatMenuForCluster(statement, clusterContent) +
           engine.layout(ALL_MATERIALIZED_VIEWS_TEMPLATE,
@@ -368,16 +393,20 @@ object DisplaySystem {
 
     val formatNoResult: String = engine.layout("/scalate/noResult.ssp")
 
+    def nodeToString(node: Node): String = {
+      node.getEndPoint.toString
+    }
+
     def noResultWithExecutionInfo(lastQuery: String, execInfo: ExecutionInfo): String = {
-      val consistency = Option(execInfo.getAchievedConsistencyLevel).getOrElse("N/A")
-      val queriedHosts = execInfo.getQueriedHost.toString.replaceAll("/","").replaceAll("""\[""","").replaceAll("""\]""","")
-      val triedHosts = execInfo.getTriedHosts.toString.replaceAll("/","").replaceAll("""\[""","").replaceAll("""\]""","")
+      val queriedHosts = nodeToString(execInfo.getCoordinator)
+      val triedHosts = (execInfo.getErrors.asScala.map(x => nodeToString(x.getKey))
+        .toSet + queriedHosts).mkString(", ")
       val schemaInAgreement = Option(execInfo.isSchemaInAgreement).map(_.toString).getOrElse("N/A")
 
       engine.layout("/scalate/noResultWithExecutionInfo.ssp",
-        Map[String,Any]("query" -> lastQuery, "consistency" -> consistency,
-                        "triedHosts" -> triedHosts, "queriedHosts" -> queriedHosts,
-                        "schemaInAgreement" -> schemaInAgreement))
+        Map[String, Any]("query" -> lastQuery,
+          "triedHosts" -> triedHosts, "queriedHosts" -> queriedHosts,
+          "schemaInAgreement" -> schemaInAgreement))
     }
   }
 
@@ -402,6 +431,7 @@ object DisplaySystem {
       formatMenu(statement, dropDownMenu)
     }
   }
+
 }
 
 class ColumnMetaWrapper(val columnMeta: ColumnMetadata) {
@@ -413,7 +443,11 @@ class ColumnMetaWrapper(val columnMeta: ColumnMetadata) {
     case _ => false
   }
 
-  override def hashCode: Int = columnMeta.getName.hashCode
+  override def hashCode: Int = columnMeta.getName.asCql(true).hashCode
+}
+
+object ColumnMetaWrapper {
+  def apply(columnMeta: ColumnMetadata): ColumnMetaWrapper = new ColumnMetaWrapper(columnMeta)
 }
 
 /**
@@ -423,153 +457,156 @@ class ColumnMetaWrapper(val columnMeta: ColumnMetadata) {
  */
 object MetaDataConverter {
 
-  type DriverClusteringOrder = com.datastax.driver.core.ClusteringOrder
+  type DriverClusteringOrder = com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder
 
   def functionMetaToFunctionDetails(function: FunctionMetadata): FunctionDetails = {
-    new FunctionDetails(function.getKeyspace.getName,
-      function.getSimpleName,
-      function.getArguments.asScala
-        .toMap
-        .map{case(paramName, dataType) => paramName + " " + dataType.asFunctionParameterString()}
+    val signature = function.getSignature
+    FunctionDetails(function.getKeyspace.asCql(true),
+      signature.getName.asCql(true),
+      function.getParameterNames.asScala.zip(signature.getParameterTypes.asScala)
+        .map { case (paramName, dataType) => paramName + " " + dataType.asCql(true, true) }
         .toList,
       function.isCalledOnNullInput,
-      function.getReturnType.asFunctionParameterString(),
+      function.getReturnType.asCql(true, true),
       function.getLanguage,
       function.getBody,
-      function.exportAsString())
+      function.describe(true))
   }
 
-  def functionMetaToFunctionSummary(function: FunctionMetadata): FunctionSummary = {
-    new FunctionSummary(function.getKeyspace.getName,
-      function.getSimpleName,
-      function.getArguments.asScala.toMap
-        .mapValues(dataType => dataType.asFunctionParameterString())
-        .values.toList,
-      function.getReturnType.asFunctionParameterString()
+  def functionMetaToFunctionSummary(signature: FunctionSignature, function: FunctionMetadata): FunctionSummary = {
+    val signature = function.getSignature
+    FunctionSummary(function.getKeyspace.asCql(true),
+      signature.getName.asCql(true),
+      function.getParameterNames.asScala.zip(signature.getParameterTypes.asScala)
+        .map { case (paramName, dataType) => paramName + " " + dataType.asCql(true, true) }
+        .toList,
+      function.getReturnType.asCql(true, true)
     )
   }
 
   def aggregateMetaToAggregateDetails(codecRegistry: CodecRegistry, aggregate: AggregateMetadata): AggregateDetails = {
-    val sFunc: FunctionSummary = functionMetaToFunctionSummary(aggregate.getStateFunc)
-    val finalFunc: Option[String] = Option(aggregate.getFinalFunc).map(func => {
-      val finalFunction = functionMetaToFunctionSummary(func)
-      finalFunction.name + finalFunction.arguments.mkString("(", ", ", ")")
-    })
     val sType = aggregate.getStateType
-    val initCond: Option[String] = Option(aggregate.getInitCond).map(codecRegistry.codecFor(sType).format(_))
-    val returnType: String = Option(aggregate.getFinalFunc) match {
-      case Some(finalFunc) => functionMetaToFunctionSummary(finalFunc).returnType
-      case None => sFunc.returnType
-    }
-
-    new AggregateDetails(aggregate.getKeyspace.getName,
-      aggregate.getSimpleName,
-      aggregate.getArgumentTypes.asScala.toList.map(_.asFunctionParameterString()),
-      sFunc.name + sFunc.arguments.mkString("(",", ", ")"),
-      sType.asFunctionParameterString(),
+    val initCond: Option[String] = Option(aggregate.getInitCond)
+      .map(value => codecRegistry.codecFor(sType).format(value.get))
+    val returnType = aggregate.getReturnType
+    val finalFunc: Option[String] = aggregate.getFinalFuncSignature.asScala
+      .map(x => x.getName.asCql(true) +
+        x.getParameterTypes.asScala.map(_.asCql(true, true)).mkString("(", ", ", ")"))
+
+    val sFunc = aggregate.getStateFuncSignature
+    val sFuncString = sFunc.getName.asCql(true) + sFunc.getParameterTypes.asScala
+      .map(_.asCql(true, true)).mkString("(", ", ", ")")
+
+    AggregateDetails(aggregate.getKeyspace.asCql(true),
+      aggregate.getSignature.getName.asCql(true),
+      aggregate.getSignature.getParameterTypes.asScala.map(_.asCql(true, true)),
+      sFuncString,
+      sType.asCql(true, true),
       finalFunc,
       initCond,
-      returnType,
-      aggregate.exportAsString())
+      returnType.asCql(true, true),
+      aggregate.describe(true)
+    )
   }
 
-  def aggregateMetaToAggregateSummary(aggregate: AggregateMetadata): AggregateSummary = {
-    val returnType: String = Option(aggregate.getFinalFunc) match {
-      case Some(finalFunc) => functionMetaToFunctionSummary(finalFunc).returnType
-      case None => aggregate.getStateType.asFunctionParameterString()
-    }
+  def aggregateMetaToAggregateSummary(signature: FunctionSignature, aggregate: AggregateMetadata): AggregateSummary = {
+    val returnType: String = aggregate.getReturnType.asCql(true, true)
 
-    new AggregateSummary(aggregate.getKeyspace.getName,
-      aggregate.getSimpleName,
-      aggregate.getArgumentTypes.asScala.toList.map(_.asFunctionParameterString()),
+    AggregateSummary(aggregate.getKeyspace.asCql(true),
+      signature.getName.asCql(true),
+      signature.getParameterTypes.asScala.map( _.asCql(true, true)),
       returnType
     )
   }
 
-  def mvMetaToMaterializedViewDetails(mv: MaterializedViewMetadata): MaterializedViewDetails = {
-    new MaterializedViewDetails(mv.getName, MV.mvMetaToColumnDetails(mv), mv.exportAsString(), mv.getBaseTable.getName)
+  def mvMetaToMaterializedViewDetails(mv: ViewMetadata): MaterializedViewDetails = {
+    MaterializedViewDetails(mv.getName.asCql(true), MV.mvMetaToColumnDetails(mv), mv.describe(true),
+      mv.getBaseTable.asCql(true))
   }
 
-  def mvMetaToMaterializedViewSummary(mv: MaterializedViewMetadata): MaterializedViewSummary = {
-    new MaterializedViewSummary(mv.getName, mv.getBaseTable.getName)
+  def mvMetaToMaterializedViewSummary(mv: ViewMetadata): MaterializedViewSummary = {
+    MaterializedViewSummary(mv.getName.asCql(true), mv.getBaseTable.asCql(true))
   }
 
   trait TableOrView {
-    protected def extractNormalColumns(columns: List[ColumnMetaWrapper]): List[ColumnDetails] = {
-      columns
-        .filter(_.columnMeta.isStatic == false)
-        .map(c => new ColumnDetails(c.columnMeta.getName, NormalColumn, c.columnMeta.getType))
+    protected def extractNormalColumns(columns: Seq[ColumnMetaWrapper]): Seq[ColumnDetails] = {
+      columns.filter(_.columnMeta.isStatic == false)
+        .map(c => ColumnDetails(c.columnMeta.getName.asCql(true), NormalColumn, c.columnMeta.getType))
     }
 
-    protected def extractStaticColumns(columns: List[ColumnMetaWrapper]): List[ColumnDetails] = {
-      columns
-        .filter(_.columnMeta.isStatic == true)
-        .map(c => new ColumnDetails(c.columnMeta.getName, StaticColumn, c.columnMeta.getType))
+    protected def extractStaticColumns(columns: Seq[ColumnMetaWrapper]): Seq[ColumnDetails] = {
+      columns.filter(_.columnMeta.isStatic == true)
+        .map(c => ColumnDetails(c.columnMeta.getName.asCql(true), StaticColumn, c.columnMeta.getType))
     }
 
-    protected def convertClusteringColumns(columns: List[ColumnMetaWrapper], orders: List[DriverClusteringOrder]): List[ColumnDetails] = {
-      columns
-        .zip(orders)
-        .map{case(c,order) => new ColumnDetails(c.columnMeta.getName,
-          new ClusteringColumn(OrderConverter.convert(order)),c.columnMeta.getType)}
-
+    protected def convertClusteringColumns(columns: Seq[ColumnMetaWrapper], orders: Seq[DriverClusteringOrder]): Seq[ColumnDetails] = {
+      columns.zip(orders)
+        .map { case (c, order) => ColumnDetails(c.columnMeta.getName.asCql(true),
+          ClusteringColumn(OrderConverter.convert(order)), c.columnMeta.getType)
+        }
     }
 
-    protected def convertPartitionKeys(columns: List[ColumnMetaWrapper]): List[ColumnDetails] = {
+    protected def convertPartitionKeys(columns: Seq[ColumnMetaWrapper]): Seq[ColumnDetails] = {
       columns
-        .map(c => new ColumnDetails(c.columnMeta.getName, PartitionKey, c.columnMeta.getType))
+        .map(c => ColumnDetails(c.columnMeta.getName.asCql(true), PartitionKey, c.columnMeta.getType))
     }
-  }
 
-  object Table extends TableOrView {
-    def tableMetaToColumnDetails(meta: TableMetadata): List[ColumnDetails] = {
-      val partitionKeys: List[ColumnMetaWrapper] = meta.getPartitionKey.asScala.toList.map(new ColumnMetaWrapper(_))
-      val clusteringColumns: List[ColumnMetaWrapper] = meta.getClusteringColumns.asScala.toList.map(new ColumnMetaWrapper(_))
-      val columns: List[ColumnMetaWrapper] = meta.getColumns.asScala.toList.map(new ColumnMetaWrapper(_))
-        .diff(partitionKeys).diff(clusteringColumns)
-      val clusteringOrders = meta.getClusteringOrder.asScala.toList
-
-      convertPartitionKeys(partitionKeys):::
-        extractStaticColumns(columns):::
-        convertClusteringColumns(clusteringColumns, clusteringOrders):::
+    def relationMetaToColumnDetails(meta: RelationMetadata): Seq[ColumnDetails] = {
+      val partitionKeys: Seq[ColumnMetaWrapper] = meta.getPartitionKey.asScala.map(ColumnMetaWrapper(_))
+
+      // mutable structures are used to keep the order of the columns - it's lost if we're doing .asScala, etc.
+      val clusteringColumnsRaw = meta.getClusteringColumns
+      val clusteringColumns = new Array[ColumnMetaWrapper](clusteringColumnsRaw.size())
+      val clusteringOrders =  new Array[DriverClusteringOrder](clusteringColumnsRaw.size())
+      var i = 0
+      for (meta <- clusteringColumnsRaw.keySet().iterator().asScala) {
+        clusteringColumns(i) = ColumnMetaWrapper(meta)
+        clusteringOrders(i) = clusteringColumnsRaw.get(meta)
+        i += 1
+      }
+
+      val primaryKeyNames = meta.getPrimaryKey.asScala.map(_.getName).toSet
+      val columnsRaw = meta.getColumns
+      val columns = new Array[ColumnMetaWrapper](columnsRaw.size() - primaryKeyNames.size)
+      i = 0
+      for (col <- columnsRaw.keySet().iterator().asScala) {
+        if (!primaryKeyNames.contains(col)) {
+          columns(i) =  ColumnMetaWrapper(columnsRaw.get(col))
+          i += 1
+        }
+      }
+
+      convertPartitionKeys(partitionKeys) ++
+        extractStaticColumns(columns) ++
+        convertClusteringColumns(clusteringColumns, clusteringOrders) ++
         extractNormalColumns(columns)
     }
 
-    def tableMetaToIndexDetails(meta: TableMetadata): List[IndexDetails] = {
-      meta.getIndexes.asScala.toList
-        .map(index => IndexDetails(index.getName, index.getTarget, index.asCQLQuery()))
-        .sortBy(index => index.name)
-    }
+  }
 
+  object Table extends TableOrView {
+    def tableMetaToColumnDetails(meta: TableMetadata): Seq[ColumnDetails] = {
+      relationMetaToColumnDetails(meta)
+    }
 
+    def tableMetaToIndexDetails(meta: TableMetadata): Seq[IndexDetails] = {
+      meta.getIndexes.asScala.map {
+        case (name: CqlIdentifier, index: IndexMetadata) =>
+          IndexDetails(name.asCql(true), index.getTarget, index.describe(true))
+      }.toSeq.sortBy(_.name)
+    }
   }
 
   object MV extends TableOrView {
-    def mvMetaToColumnDetails(meta: MaterializedViewMetadata): List[ColumnDetails] = {
-      val partitionKeys: List[ColumnMetaWrapper] = meta.getPartitionKey.asScala.toList.map(new ColumnMetaWrapper(_))
-      val clusteringColumns: List[ColumnMetaWrapper] = meta.getClusteringColumns.asScala.toList.map(new ColumnMetaWrapper(_))
-      val columns: List[ColumnMetaWrapper] = meta.getColumns.asScala.toList.map(new ColumnMetaWrapper(_))
-        .diff(partitionKeys).diff(clusteringColumns)
-      val clusteringOrders = meta.getClusteringOrder.asScala.toList
-
-      convertPartitionKeys(partitionKeys):::
-        convertClusteringColumns(clusteringColumns, clusteringOrders):::
-        extractNormalColumns(columns)
+    def mvMetaToColumnDetails(meta: ViewMetadata): Seq[ColumnDetails] = {
+      relationMetaToColumnDetails(meta)
     }
   }
 
   object UDT {
-    def userTypeToColumnDetails(userType: UserType): List[ColumnDetails] = {
-      userType.getFieldNames.asScala.toList
-        .map(name => new ColumnDetails(name, NormalColumn, userType.getFieldType(name)))
+    def userTypeToColumnDetails(userType: UserDefinedType): Seq[ColumnDetails] = {
+      userType.getFieldNames.asScala.zip(userType.getFieldTypes.asScala)
+        .map { case (name, typ) => ColumnDetails(name.asCql(true), NormalColumn, typ) }
     }
   }
-
-
-
 }
-
-
-
-
-
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
index 988cdd2..31f5142 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
@@ -18,20 +18,23 @@ package org.apache.zeppelin.cassandra
 
 import java.util.regex.Pattern
 
-import com.datastax.driver.core._
+import com.datastax.oss.driver.api.core.CqlSession
+import com.datastax.oss.driver.api.core.cql.{BatchStatement, BatchType, BoundStatement, ExecutionInfo, ResultSet, SimpleStatement, Statement}
+import com.datastax.oss.driver.api.core.metadata.Metadata
+import com.datastax.oss.driver.api.core.metadata.schema.{AggregateMetadata, FunctionMetadata, KeyspaceMetadata, ViewMetadata}
 import org.apache.zeppelin.cassandra.TextBlockHierarchy._
 import org.apache.zeppelin.interpreter.InterpreterException
 import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConverters._
+import scala.compat.java8.OptionConverters._
 
 /**
  * Enhance the Java driver session
  * with special statements
  * to describe schema
  */
-class EnhancedSession(val session: Session) {
-
+class EnhancedSession(val session: CqlSession) {
   val clusterDisplay = DisplaySystem.ClusterDisplay
   val keyspaceDisplay = DisplaySystem.KeyspaceDisplay
   val tableDisplay = DisplaySystem.TableDisplay
@@ -41,7 +44,8 @@ class EnhancedSession(val session: Session) {
   val materializedViewDisplay = DisplaySystem.MaterializedViewDisplay
   val helpDisplay = DisplaySystem.HelpDisplay
   private val noResultDisplay = DisplaySystem.NoResultDisplay
-  private val DEFAULT_CHECK_TIME = 200 // half second
+  private val DEFAULT_CHECK_TIME: Int = 200
+  private val MAX_SCHEMA_AGREEMENT_WAIT: Int = 120000 // 120 seconds
   private val LOGGER = LoggerFactory.getLogger(classOf[EnhancedSession])
 
   val HTML_MAGIC = "%html \n"
@@ -53,130 +57,132 @@ class EnhancedSession(val session: Session) {
   }
 
   private def execute(describeCluster: DescribeClusterCmd): String = {
-    val metaData = session.getCluster.getMetadata
+    val metaData = session.getMetadata
     HTML_MAGIC + clusterDisplay.formatClusterOnly(describeCluster.statement, metaData)
   }
 
   private def execute(describeKeyspaces: DescribeKeyspacesCmd): String = {
-    val metaData = session.getCluster.getMetadata
+    val metaData = session.getMetadata
     HTML_MAGIC + clusterDisplay.formatClusterContent(describeKeyspaces.statement, metaData)
   }
 
   private def execute(describeTables: DescribeTablesCmd): String = {
-    val metadata: Metadata = session.getCluster.getMetadata
+    val metadata = session.getMetadata
     HTML_MAGIC + clusterDisplay.formatAllTables(describeTables.statement,metadata)
   }
 
   private def execute(describeKeyspace: DescribeKeyspaceCmd): String = {
     val keyspace: String = describeKeyspace.keyspace
-    val metadata: Option[KeyspaceMetadata] = Option(session.getCluster.getMetadata.getKeyspace(keyspace))
+    val metadata: Option[KeyspaceMetadata] = session.getMetadata.getKeyspace(keyspace).asScala
     metadata match {
       case Some(ksMeta) => HTML_MAGIC + keyspaceDisplay.formatKeyspaceContent(describeKeyspace.statement, ksMeta,
-        session.getCluster.getConfiguration.getCodecRegistry)
+        session.getContext.getCodecRegistry)
       case None => throw new InterpreterException(s"Cannot find keyspace $keyspace")
     }
   }
 
+  private def getKeySpace(session: CqlSession): String = {
+    session.getKeyspace.asScala.map(_.asCql(true)).getOrElse("system")
+  }
+
   private def execute(describeTable: DescribeTableCmd): String = {
-    val metaData = session.getCluster.getMetadata
+    val metaData = session.getMetadata
     val tableName: String = describeTable.table
-    val keyspace: String = describeTable.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
+    val keyspace: String = describeTable.keyspace.getOrElse(getKeySpace(session))
 
-    Option(metaData.getKeyspace(keyspace)).flatMap(ks => Option(ks.getTable(tableName))) match {
-      case Some(tableMeta) => HTML_MAGIC + tableDisplay.format(describeTable.statement, tableMeta, true)
+    metaData.getKeyspace(keyspace).asScala.flatMap(ks => ks.getTable(tableName).asScala) match {
+      case Some(tableMeta) => HTML_MAGIC + tableDisplay.format(describeTable.statement, tableMeta, withCaption = true)
       case None => throw new InterpreterException(s"Cannot find table $keyspace.$tableName")
     }
   }
 
   private def execute(describeUDT: DescribeTypeCmd): String = {
-    val metaData = session.getCluster.getMetadata
-    val keyspace: String = describeUDT.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
+    val metaData = session.getMetadata
+    val keyspace: String = describeUDT.keyspace.getOrElse(getKeySpace(session))
     val udtName: String = describeUDT.udtName
 
-    Option(metaData.getKeyspace(keyspace)).flatMap(ks => Option(ks.getUserType(udtName))) match {
-      case Some(userType) => HTML_MAGIC + udtDisplay.format(describeUDT.statement, userType, true)
+    metaData.getKeyspace(keyspace).asScala.flatMap(ks => ks.getUserDefinedType(udtName).asScala) match {
+      case Some(userType) => HTML_MAGIC + udtDisplay.format(describeUDT.statement, userType, withCaption = true)
       case None => throw new InterpreterException(s"Cannot find type $keyspace.$udtName")
     }
   }
 
   private def execute(describeUDTs: DescribeTypesCmd): String = {
-    val metadata: Metadata = session.getCluster.getMetadata
+    val metadata: Metadata = session.getMetadata
     HTML_MAGIC + clusterDisplay.formatAllUDTs(describeUDTs.statement, metadata)
   }
 
   private def execute(describeFunction: DescribeFunctionCmd): String = {
-    val metaData = session.getCluster.getMetadata
-    val keyspaceName: String = describeFunction.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
-    val functionName: String = describeFunction.function;
+    val metaData = session.getMetadata
+    val keyspaceName: String = describeFunction.keyspace.getOrElse(getKeySpace(session))
+    val functionName: String = describeFunction.function
 
-    Option(metaData.getKeyspace(keyspaceName)) match {
-      case Some(keyspace) => {
-        val functionMetas: List[FunctionMetadata] = keyspace.getFunctions.asScala.toList
-          .filter(func => func.getSimpleName.toLowerCase == functionName.toLowerCase)
+    metaData.getKeyspace(keyspaceName).asScala match {
+      case Some(keyspace) =>
+        val functionMetas: Seq[FunctionMetadata] = keyspace.getFunctions.asScala.toSeq
+          .filter { case (sig, _) => sig.getName.asCql(true).toLowerCase == functionName.toLowerCase}
+          .map(_._2)
 
         if(functionMetas.isEmpty) {
-          throw new InterpreterException(s"Cannot find function ${keyspaceName}.$functionName")
-        } else {
-          HTML_MAGIC + functionDisplay.format(describeFunction.statement, functionMetas, true)
+          throw new InterpreterException(s"Cannot find function $keyspaceName.$functionName")
         }
-      }
-      case None => throw new InterpreterException(s"Cannot find function ${keyspaceName}.$functionName")
+        HTML_MAGIC + functionDisplay.format(describeFunction.statement, functionMetas, withCaption = true)
+
+      case None =>
+        throw new InterpreterException(s"Cannot find function $keyspaceName.$functionName")
     }
   }
 
   private def execute(describeFunctions: DescribeFunctionsCmd): String = {
-    val metadata: Metadata = session.getCluster.getMetadata
+    val metadata: Metadata = session.getMetadata
     HTML_MAGIC + clusterDisplay.formatAllFunctions(describeFunctions.statement, metadata)
   }
 
   private def execute(describeAggregate: DescribeAggregateCmd): String = {
-    val metaData = session.getCluster.getMetadata
-    val keyspaceName: String = describeAggregate.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
-    val aggregateName: String = describeAggregate.aggregate;
+    val metaData = session.getMetadata
+    val keyspaceName: String = describeAggregate.keyspace.getOrElse(getKeySpace(session))
+    val aggregateName: String = describeAggregate.aggregate
 
-    Option(metaData.getKeyspace(keyspaceName)) match {
-      case Some(keyspace) => {
-        val aggMetas: List[AggregateMetadata] = keyspace.getAggregates.asScala.toList
-          .filter(agg => agg.getSimpleName.toLowerCase == aggregateName.toLowerCase)
+    metaData.getKeyspace(keyspaceName).asScala match {
+      case Some(keyspace) =>
+        val aggMetas: Seq[AggregateMetadata] = keyspace.getAggregates.asScala.toSeq
+          .filter { case (sig, _) => sig.getName.asCql(true).toLowerCase == aggregateName.toLowerCase }
+          .map(_._2)
 
         if(aggMetas.isEmpty) {
-          throw new InterpreterException(s"Cannot find aggregate ${keyspaceName}.$aggregateName")
-        } else {
-          HTML_MAGIC + aggregateDisplay.format(describeAggregate.statement, aggMetas, true,
-            session
-            .getCluster
-            .getConfiguration
-            .getCodecRegistry)
+          throw new InterpreterException(s"Cannot find aggregate $keyspaceName.$aggregateName")
         }
-      }
-      case None => throw new InterpreterException(s"Cannot find aggregate ${keyspaceName}.$aggregateName")
+        HTML_MAGIC + aggregateDisplay.format(describeAggregate.statement, aggMetas, withCaption = true,
+          session.getContext.getCodecRegistry)
+
+      case None => throw new InterpreterException(s"Cannot find aggregate $keyspaceName.$aggregateName")
     }
   }
 
   private def execute(describeAggregates: DescribeAggregatesCmd): String = {
-    val metadata: Metadata = session.getCluster.getMetadata
+    val metadata: Metadata = session.getMetadata
     HTML_MAGIC + clusterDisplay.formatAllAggregates(describeAggregates.statement, metadata)
   }
 
   private def execute(describeMV: DescribeMaterializedViewCmd): String = {
-    val metaData = session.getCluster.getMetadata
-    val keyspaceName: String = describeMV.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
+    val metaData = session.getMetadata
+    val keyspaceName: String = describeMV.keyspace.getOrElse(getKeySpace(session))
     val viewName: String = describeMV.view
 
-    Option(metaData.getKeyspace(keyspaceName)) match {
-      case Some(keyspace) => {
-        val viewMeta: Option[MaterializedViewMetadata] = Option(keyspace.getMaterializedView(viewName))
+    metaData.getKeyspace(keyspaceName).asScala match {
+      case Some(keyspace) =>
+        val viewMeta: Option[ViewMetadata] = keyspace.getView(viewName).asScala
         viewMeta match {
-          case Some(vMeta) => HTML_MAGIC + materializedViewDisplay.format(describeMV.statement, vMeta, true)
-          case None => throw new InterpreterException(s"Cannot find materialized view ${keyspaceName}.$viewName")
+          case Some(vMeta) => HTML_MAGIC + materializedViewDisplay.format(describeMV.statement, vMeta, withCaption = true)
+          case None => throw new InterpreterException(s"Cannot find materialized view $keyspaceName.$viewName")
         }
-      }
-      case None => throw new InterpreterException(s"Cannot find materialized view ${keyspaceName}.$viewName")
+
+      case None => throw new InterpreterException(s"Cannot find materialized view $keyspaceName.$viewName")
     }
   }
 
   private def execute(describeMVs: DescribeMaterializedViewsCmd): String = {
-    val metadata: Metadata = session.getCluster.getMetadata
+    val metadata: Metadata = session.getMetadata
     HTML_MAGIC + clusterDisplay.formatAllMaterializedViews(describeMVs.statement, metadata)
   }
 
@@ -184,38 +190,45 @@ class EnhancedSession(val session: Session) {
     HTML_MAGIC + helpDisplay.formatHelp()
   }
 
-
-  private def execute(st: Statement): Any = {
-    val rs = session.execute(st)
+  private def executeStatement[StatementT <: Statement[StatementT]](st: StatementT): Any = {
+    val rs: ResultSet = session.execute(st)
     if (EnhancedSession.isDDLStatement(st)) {
       if (!rs.getExecutionInfo.isSchemaInAgreement) {
-        val metadata = session.getCluster.getMetadata
-        while(!metadata.checkSchemaAgreement) {
+        val startTime = System.currentTimeMillis()
+        while(!session.checkSchemaAgreement()) {
           LOGGER.info("Schema is still not in agreement, waiting...")
-          Thread.sleep(DEFAULT_CHECK_TIME)
+          try {
+            Thread.sleep(DEFAULT_CHECK_TIME)
+          } catch {
+            case x: InterruptedException => None
+          }
+          val sinceStart = (System.currentTimeMillis() - startTime) / 1000
+          if (sinceStart > MAX_SCHEMA_AGREEMENT_WAIT) {
+            throw new RuntimeException(s"Can't achieve schema agreement after $sinceStart seconds")
+          }
         }
       }
     }
     rs
   }
 
-  def execute(st: Any): Any = {
+  def execute[StatementT <: Statement[StatementT]](st: Any): Any = {
     st match {
-      case x:DescribeClusterCmd => execute(x)
-      case x:DescribeKeyspaceCmd => execute(x)
-      case x:DescribeKeyspacesCmd => execute(x)
-      case x:DescribeTableCmd => execute(x)
-      case x:DescribeTablesCmd => execute(x)
-      case x:DescribeTypeCmd => execute(x)
-      case x:DescribeTypesCmd => execute(x)
-      case x:DescribeFunctionCmd => execute(x)
-      case x:DescribeFunctionsCmd => execute(x)
-      case x:DescribeAggregateCmd => execute(x)
-      case x:DescribeAggregatesCmd => execute(x)
-      case x:DescribeMaterializedViewCmd => execute(x)
-      case x:DescribeMaterializedViewsCmd => execute(x)
-      case x:HelpCmd => execute(x)
-      case x:Statement => execute(x)
+      case x: DescribeClusterCmd => execute(x)
+      case x: DescribeKeyspaceCmd => execute(x)
+      case x: DescribeKeyspacesCmd => execute(x)
+      case x: DescribeTableCmd => execute(x)
+      case x: DescribeTablesCmd => execute(x)
+      case x: DescribeTypeCmd => execute(x)
+      case x: DescribeTypesCmd => execute(x)
+      case x: DescribeFunctionCmd => execute(x)
+      case x: DescribeFunctionsCmd => execute(x)
+      case x: DescribeAggregateCmd => execute(x)
+      case x: DescribeAggregatesCmd => execute(x)
+      case x: DescribeMaterializedViewCmd => execute(x)
+      case x: DescribeMaterializedViewsCmd => execute(x)
+      case x: HelpCmd => execute(x)
+      case x: StatementT => executeStatement(x)
       case _ => throw new InterpreterException(s"Cannot execute statement '$st' of type ${st.getClass}")
     }
   }
@@ -228,16 +241,44 @@ object EnhancedSession {
     DDL_REGEX.matcher(query.trim).matches()
   }
 
-  def isDDLStatement(st: Statement): Boolean = {
+  def isDDLStatement[StatementT <: Statement[StatementT]](st: StatementT): Boolean = {
     st match {
-      case x:BoundStatement =>
-        isDDLStatement(x.preparedStatement.getQueryString)
-      case x:BatchStatement =>
-        x.getStatements.asScala.seq.exists(isDDLStatement)
-      case x:RegularStatement =>
-        isDDLStatement(x.getQueryString)
+      case x: BoundStatement =>
+        isDDLStatement(x.getPreparedStatement.getQuery)
+      case x: BatchStatement =>
+        x.iterator().asScala.toSeq.exists {
+          case t: BoundStatement => isDDLStatement(t.getPreparedStatement.getQuery)
+          case t: SimpleStatement => isDDLStatement(t.getQuery)
+        }
+      case x: SimpleStatement =>
+        isDDLStatement(x.getQuery)
       case _ => // only should be for StatementWrapper
         true
     }
   }
+
+  def getCqlStatement[StatementT <: Statement[StatementT]](st: StatementT): String = {
+    st match {
+      case x: BoundStatement =>
+        x.getPreparedStatement.getQuery
+      case x: BatchStatement =>
+        val batchType = x.getBatchType
+        val timestamp = x.getQueryTimestamp
+        val timestampStr = if (timestamp == Long.MinValue) "" else " USING TIMESTAMP " + timestamp
+        val batchTypeStr = if (batchType == BatchType.COUNTER) "COUNTER "
+        else if (batchType == BatchType.UNLOGGED) "UNLOGGED "
+        else ""
+
+        "BEGIN " +  batchTypeStr + "BATCH" + timestampStr + "\n"
+          x.iterator().asScala.toSeq.map {
+            case t: BoundStatement => t.getPreparedStatement.getQuery
+            case t: SimpleStatement => t.getQuery
+          }.mkString("\n") + "\nAPPLY BATCH;"
+      case x: SimpleStatement =>
+        x.getQuery
+      case _ =>
+        ""
+    }
+  }
+
 }
\ No newline at end of file
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
index a871abd..bd37c45 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
@@ -20,24 +20,27 @@ import java.io.{ByteArrayOutputStream, PrintStream}
 import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
+import java.time.format.DateTimeFormatter
+import java.time.{Duration, Instant, LocalDateTime, ZoneOffset}
 import java.util
-import java.util.Date
 import java.util.concurrent.ConcurrentHashMap
 
-import com.datastax.driver.core.DataType.Name._
-import com.datastax.driver.core._
-import com.datastax.driver.core.exceptions.DriverException
-import com.datastax.driver.core.policies.{LoggingRetryPolicy, FallthroughRetryPolicy, DowngradingConsistencyRetryPolicy, Policies}
+import com.datastax.oss.driver.api.core.`type`.{DataType, ListType, MapType, SetType, TupleType, UserDefinedType}
+import com.datastax.oss.driver.api.core.`type`.DataTypes._
+import com.datastax.oss.driver.api.core.`type`.codec.TypeCodec
+import com.datastax.oss.driver.api.core.`type`.codec.registry.CodecRegistry
+import com.datastax.oss.driver.api.core.cql.{BatchStatement, BatchType, BatchableStatement, BoundStatement, ExecutionInfo, PreparedStatement, ResultSet, Row, SimpleStatement, Statement}
+import com.datastax.oss.driver.api.core.{ConsistencyLevel, CqlSession, DriverException, ProtocolVersion}
 import org.apache.zeppelin.cassandra.TextBlockHierarchy._
-import org.apache.zeppelin.display.AngularObjectRegistry
 import org.apache.zeppelin.display.ui.OptionInput.ParamOption
 import org.apache.zeppelin.interpreter.InterpreterResult.Code
-import org.apache.zeppelin.interpreter.{InterpreterException, InterpreterResult, InterpreterContext}
-import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterResult}
+import org.slf4j.{Logger, LoggerFactory}
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.util.matching.Regex
 
 
 /**
@@ -46,14 +49,12 @@ import scala.collection.mutable.ArrayBuffer
   * @param consistency consistency level
  * @param serialConsistency serial consistency level
  * @param timestamp timestamp
- * @param retryPolicy retry policy
  * @param fetchSize query fetch size
  * @param requestTimeOut request time out in millisecs
  */
 case class CassandraQueryOptions(consistency: Option[ConsistencyLevel],
                                  serialConsistency:Option[ConsistencyLevel],
                                  timestamp: Option[Long],
-                                 retryPolicy: Option[RetryPolicy],
                                  fetchSize: Option[Int],
                                  requestTimeOut: Option[Int])
 
@@ -63,27 +64,24 @@ case class CassandraQueryOptions(consistency: Option[ConsistencyLevel],
 object InterpreterLogic {
   
   val CHOICES_SEPARATOR : String = """\|"""
-  val VARIABLE_PATTERN = """\{\{[^}]+\}\}""".r
-  val SIMPLE_VARIABLE_DEFINITION_PATTERN = """\{\{([^=]+)=([^=]+)\}\}""".r
-  val MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN = """\{\{([^=]+)=((?:[^=]+\|)+[^|]+)\}\}""".r
+  val VARIABLE_PATTERN: Regex = """\{\{[^}]+\}\}""".r
+  val SIMPLE_VARIABLE_DEFINITION_PATTERN: Regex = """\{\{([^=]+)=([^=]+)\}\}""".r
+  val MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN: Regex = """\{\{([^=]+)=((?:[^=]+\|)+[^|]+)\}\}""".r
 
   val STANDARD_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"
   val ACCURATE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"
+  // TODO(alex): add more time formatters, like, ISO...
+  val STANDARD_DATE_FORMATTER: DateTimeFormatter = DateTimeFormatter.ofPattern(STANDARD_DATE_FORMAT)
+  val ACCURATE_DATE_FORMATTER: DateTimeFormatter = DateTimeFormatter.ofPattern(ACCURATE_DATE_FORMAT)
 
-  val defaultRetryPolicy = Policies.defaultRetryPolicy()
-  val downgradingConsistencyRetryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE
-  val fallThroughRetryPolicy = FallthroughRetryPolicy.INSTANCE
-  val loggingDefaultRetryPolicy = new LoggingRetryPolicy(defaultRetryPolicy)
-  val loggingDownGradingRetryPolicy = new LoggingRetryPolicy(downgradingConsistencyRetryPolicy)
-  val loggingFallThroughRetryPolicy = new LoggingRetryPolicy(fallThroughRetryPolicy)
 
-  val preparedStatements : mutable.Map[String,PreparedStatement] = new ConcurrentHashMap[String,PreparedStatement]().asScala
+  val preparedStatements : mutable.Map[String, PreparedStatement] = new ConcurrentHashMap[String,PreparedStatement]().asScala
 
-  val logger = LoggerFactory.getLogger(classOf[InterpreterLogic])
+  val logger: Logger = LoggerFactory.getLogger(classOf[InterpreterLogic])
 
   val paragraphParser = new ParagraphParser
   val boundValuesParser = new BoundValuesParser
-  
+
 }
 
 /**
@@ -93,18 +91,19 @@ object InterpreterLogic {
  *
  * @param session java driver session
  */
-class InterpreterLogic(val session: Session)  {
+class InterpreterLogic(val session: CqlSession)  {
 
   val enhancedSession: EnhancedSession = new EnhancedSession(session)
 
   import InterpreterLogic._
 
-  def interpret(session:Session, stringStatements : String, context: InterpreterContext): InterpreterResult = {
+  def interpret[StatementT <: Statement[StatementT]](session:CqlSession, stringStatements : String,
+                                                     context: InterpreterContext): InterpreterResult = {
 
     logger.info(s"Executing CQL statements : \n\n$stringStatements\n")
 
     try {
-      val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersion
+      val protocolVersion = session.getContext.getProtocolVersion
 
       val queries:List[AnyBlock] = parseInput(stringStatements)
 
@@ -131,61 +130,64 @@ class InterpreterLogic(val session: Session)  {
         .map(_.getStatement[PrepareStm])
         .foreach(statement => {
           logger.debug(s"Get or prepare statement '${statement.name}' : ${statement.query}")
-          preparedStatements.getOrElseUpdate(statement.name,session.prepare(statement.query))
+          preparedStatements.getOrElseUpdate(statement.name, session.prepare(statement.query))
         })
 
-      val statements: List[Any] = queryStatements
+      val statements: Seq[Any] = queryStatements
         .filter(st => (st.statementType != PrepareStatementType) && (st.statementType != RemovePrepareStatementType))
-        .map{
-          case x:SimpleStm => generateSimpleStatement(x, queryOptions, context)
-          case x:BatchStm => {
-            val builtStatements: List[Statement] = x.statements.map {
-              case st:SimpleStm => generateSimpleStatement(st, queryOptions, context)
-              case st:BoundStm => generateBoundStatement(session, st, queryOptions, context)
+        .map {
+          case x: SimpleStm =>
+            generateSimpleStatement(x, queryOptions, context)
+
+          case x: BatchStm =>
+            val builtStatements = x.statements.map {
+              case st: SimpleStm => generateSimpleStatement(st, queryOptions, context)
+              case st: BoundStm => generateBoundStatement(session, st, queryOptions, context)
               case _ => throw new InterpreterException(s"Unknown statement type")
             }
             generateBatchStatement(x.batchType, queryOptions, builtStatements)
-          }
-          case x:BoundStm => generateBoundStatement(session, x, queryOptions, context)
-          case x:DescribeCommandStatement => x
-          case x:HelpCmd => x
-          case x => throw new InterpreterException(s"Unknown statement type : ${x}")
+
+          case x: BoundStm =>
+            generateBoundStatement(session, x, queryOptions, context)
+          case x: DescribeCommandStatement => x
+          case x: HelpCmd => x
+          case x => throw new InterpreterException(s"Unknown statement type : $x")
        }
 
-      val results: List[(Any,Any)] = for (statement <- statements) yield (enhancedSession.execute(statement),statement)
+      val results: Seq[(Any,Any)] = for (statement <- statements)
+        yield (enhancedSession.execute(statement),statement)
 
       if (results.nonEmpty) {
         results.last match {
-          case(res: ResultSet, st: Statement) => buildResponseMessage((res, st), protocolVersion)
+          case(res: ResultSet, st: StatementT) =>
+            buildResponseMessage((res, st), protocolVersion)
           case(output: String, _) => new InterpreterResult(Code.SUCCESS, output)
           case _ => throw new InterpreterException(s"Cannot parse result type : ${results.last}")
         }
-
       } else {
         new InterpreterResult(Code.SUCCESS, enhancedSession.displayNoResult)
       }
-
     } catch {
-      case dex: DriverException => {
+      case dex: DriverException =>
         logger.error(dex.getMessage, dex)
         new InterpreterResult(Code.ERROR, parseException(dex))
-      }
-      case pex:ParsingException => {
+
+      case pex:ParsingException =>
         logger.error(pex.getMessage, pex)
         new InterpreterResult(Code.ERROR, pex.getMessage)
-      }
-      case iex: InterpreterException => {
+
+      case iex: InterpreterException =>
         logger.error(iex.getMessage, iex)
         new InterpreterResult(Code.ERROR, iex.getMessage)
-      }
-      case ex: java.lang.Exception => {
+
+      case ex: java.lang.Exception =>
         logger.error(ex.getMessage, ex)
         new InterpreterResult(Code.ERROR, parseException(ex))
-      }
     }
   }
 
-  def buildResponseMessage(lastResultSet: (ResultSet,Statement), protocolVersion: ProtocolVersion): InterpreterResult = {
+  def buildResponseMessage[StatementT <: Statement[StatementT]](lastResultSet: (ResultSet, StatementT),
+                                                                protocolVersion: ProtocolVersion): InterpreterResult = {
     val output = new StringBuilder()
     val rows: collection.mutable.ArrayBuffer[Row] = ArrayBuffer()
 
@@ -196,10 +198,9 @@ class InterpreterLogic(val session: Session)  {
 
     val columnsDefinitions: List[(String, DataType)] = lastResultSet._1
       .getColumnDefinitions
-      .asList
-      .toList // Java list -> Scala list
-      .map(definition => (definition.getName, definition.getType))
-
+      .asScala
+      .toList
+      .map(definition => (definition.getName.asCql(true), definition.getType))
 
     if (rows.nonEmpty) {
       // Create table headers
@@ -211,15 +212,19 @@ class InterpreterLogic(val session: Session)  {
       rows.foreach {
         row => {
           val data = columnsDefinitions.map {
-            case (name, dataType) => {
-              if (row.isNull(name)) null else row.getObject(name)
-            }
+            case (name, dataType) =>
+              if (row.isNull(name)) {
+                null
+              } else {
+                val value = row.getObject(name)
+                row.codecRegistry().codecFor(dataType, value).format(value)
+              }
           }
           output.append(data.mkString("\t")).append("\n")
         }
       }
     } else {
-      val lastQuery: String = lastResultSet._2.toString
+      val lastQuery: String = EnhancedSession.getCqlStatement(lastResultSet._2)
       val executionInfo: ExecutionInfo = lastResultSet._1.getExecutionInfo
       output.append(enhancedSession.displayExecutionStatistics(lastQuery, executionInfo))
     }
@@ -232,14 +237,17 @@ class InterpreterLogic(val session: Session)  {
   def parseInput(input:String): List[AnyBlock] = {
     val parsingResult: ParagraphParser#ParseResult[List[AnyBlock]] = paragraphParser.parseAll(paragraphParser.queries, input)
     parsingResult match {
-      case paragraphParser.Success(blocks,_) => blocks
-      case paragraphParser.Failure(msg,next) => {
+      case paragraphParser.Success(blocks,_) =>
+        blocks
+
+      case paragraphParser.Failure(_,_) =>
         throw new InterpreterException(s"Error parsing input:\n\t'$input'\nDid you forget to add ; (semi-colon) at the end of each CQL statement ?")
-      }
-      case paragraphParser.Error(msg,next) => {
+
+      case paragraphParser.Error(_,_) =>
         throw new InterpreterException(s"Error parsing input:\n\t'$input'\nDid you forget to add ; (semi-colon) at the end of each CQL statement ?")
-      }
-      case _ => throw new InterpreterException(s"Error parsing input: $input")
+
+      case _ =>
+        throw new InterpreterException(s"Error parsing input: $input")
     }
   }
 
@@ -266,11 +274,6 @@ class InterpreterLogic(val session: Session)  {
       .flatMap(x => Option(x.value))
       .headOption
 
-    val retryPolicy: Option[RetryPolicy] = parameters
-      .filter(_.paramType == RetryPolicyParam)
-      .map(_.getParam[RetryPolicy])
-      .headOption
-
     val fetchSize: Option[Int] = parameters
       .filter(_.paramType == FetchSizeParam)
       .map(_.getParam[FetchSize])
@@ -283,34 +286,34 @@ class InterpreterLogic(val session: Session)  {
       .flatMap(x => Option(x.value))
       .headOption
 
-    CassandraQueryOptions(consistency,serialConsistency, timestamp, retryPolicy, fetchSize, requestTimeOut)
+    CassandraQueryOptions(consistency,serialConsistency, timestamp, fetchSize, requestTimeOut)
   }
 
   def generateSimpleStatement(st: SimpleStm, options: CassandraQueryOptions,context: InterpreterContext): SimpleStatement = {
     logger.debug(s"Generating simple statement : '${st.text}'")
-    val statement = new SimpleStatement(maybeExtractVariables(st.text, context))
+    val statement = SimpleStatement.newInstance(maybeExtractVariables(st.text, context))
     applyQueryOptions(options, statement)
-    statement
   }
 
-  def generateBoundStatement(session: Session, st: BoundStm, options: CassandraQueryOptions,context: InterpreterContext): BoundStatement = {
+  def generateBoundStatement(session: CqlSession, st: BoundStm, options: CassandraQueryOptions,context: InterpreterContext): BoundStatement = {
     logger.debug(s"Generating bound statement with name : '${st.name}' and bound values : ${st.values}")
     preparedStatements.get(st.name) match {
-      case Some(ps) => {
+      case Some(ps) =>
         val boundValues = maybeExtractVariables(st.values, context)
-        createBoundStatement(session.getCluster.getConfiguration.getCodecRegistry, st.name, ps, boundValues)
-      }
-      case None => throw new InterpreterException(s"The statement '${st.name}' can not be bound to values. " +
+        createBoundStatement(session.getContext.getCodecRegistry, st.name, ps, boundValues)
+
+      case None =>
+        throw new InterpreterException(s"The statement '${st.name}' can not be bound to values. " +
           s"Are you sure you did prepare it with @prepare[${st.name}] ?")
     }
   }
 
-  def generateBatchStatement(batchType: BatchStatement.Type, options: CassandraQueryOptions, statements: List[Statement]): BatchStatement = {
-    logger.debug(s"""Generating batch statement of type '${batchType} for ${statements.mkString(",")}'""")
-    val batch = new BatchStatement(batchType)
-    statements.foreach(batch.add(_))
+  def generateBatchStatement(batchType: BatchType,
+                             options: CassandraQueryOptions,
+                             statements: Seq[BatchableStatement[_]]): BatchStatement = {
+    logger.debug(s"""Generating batch statement of type '$batchType for ${statements.mkString(",")}'""")
+    val batch = BatchStatement.newInstance(batchType).addAll(statements:_*)
     applyQueryOptions(options, batch)
-    batch
   }
 
   def maybeExtractVariables(statement: String, context: InterpreterContext): String = {
@@ -324,57 +327,46 @@ class InterpreterLogic(val session: Session)  {
       paragraphScoped
     }
 
-    def extractVariableAndDefaultValue(statement: String, exp: String):String = {
-      exp match {
-        case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable, choices) => {
-          val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""").replaceAll("""\|""","""\\|""")
-          findInAngularRepository(variable) match {
-            case Some(value) => statement.replaceAll(escapedExp,value.toString)
-            case None => {
-              val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList
-              val paramOptions = listChoices.map(choice => new ParamOption(choice, choice))
-              val selected = context.getGui.select(variable, paramOptions.toArray, listChoices.head)
-              statement.replaceAll(escapedExp,selected.toString)
-            }
-          }
+    def extractVariableAndDefaultValue(statement: String, exp: String): String = exp match {
+      case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable, choices) =>
+        val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""").replaceAll("""\|""","""\\|""")
+        findInAngularRepository(variable) match {
+          case Some(value) => statement.replaceAll(escapedExp,value.toString)
+          case None =>
+            val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList
+            val paramOptions = listChoices.map(choice => new ParamOption(choice, choice))
+            val selected = context.getGui.select(variable, paramOptions.toArray, listChoices.head)
+            statement.replaceAll(escapedExp,selected.toString)
         }
-        case SIMPLE_VARIABLE_DEFINITION_PATTERN(variable,defaultVal) => {
-          val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""")
-          findInAngularRepository(variable) match {
-            case Some(value) => statement.replaceAll(escapedExp,value.toString)
-            case None => {
-              val value = context.getGui.input(variable,defaultVal)
-              statement.replaceAll(escapedExp,value.toString)
-            }
-          }
+
+      case SIMPLE_VARIABLE_DEFINITION_PATTERN(variable,defaultVal) =>
+        val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""")
+        findInAngularRepository(variable) match {
+          case Some(value) => statement.replaceAll(escapedExp,value.toString)
+          case None =>
+            val value = context.getGui.input(variable,defaultVal)
+            statement.replaceAll(escapedExp,value.toString)
         }
-        case _ => throw new ParsingException(s"Invalid bound variable definition for '$exp' in '$statement'. It should be of form 'variable=defaultValue' or 'variable=value1|value2|...|valueN'")
-      }
+
+      case _ =>
+        throw new ParsingException(s"Invalid bound variable definition for '$exp' in '$statement'. It should be of form 'variable=defaultValue' or 'variable=value1|value2|...|valueN'")
     }
 
-    VARIABLE_PATTERN.findAllIn(statement).foldLeft(statement)(extractVariableAndDefaultValue _)
+    VARIABLE_PATTERN.findAllIn(statement).foldLeft(statement)(extractVariableAndDefaultValue)
   }
 
-  def applyQueryOptions(options: CassandraQueryOptions, statement: Statement): Unit = {
-    options.consistency.foreach(statement.setConsistencyLevel(_))
-    options.serialConsistency.foreach(statement.setSerialConsistencyLevel(_))
-    options.timestamp.foreach(statement.setDefaultTimestamp(_))
-    options.retryPolicy.foreach {
-      case DefaultRetryPolicy => statement.setRetryPolicy(defaultRetryPolicy)
-      case DowngradingRetryPolicy => statement.setRetryPolicy(downgradingConsistencyRetryPolicy)
-      case FallThroughRetryPolicy => statement.setRetryPolicy(fallThroughRetryPolicy)
-      case LoggingDefaultRetryPolicy => statement.setRetryPolicy(loggingDefaultRetryPolicy)
-      case LoggingDowngradingRetryPolicy => statement.setRetryPolicy(loggingDownGradingRetryPolicy)
-      case LoggingFallThroughRetryPolicy => statement.setRetryPolicy(loggingFallThroughRetryPolicy)
-      case _ => throw new InterpreterException(s"""Unknown retry policy ${options.retryPolicy.getOrElse("???")}""")
-    }
-    options.fetchSize.foreach(statement.setFetchSize(_))
-    options.requestTimeOut.foreach(statement.setReadTimeoutMillis(_))
+  def applyQueryOptions[StatementT <: Statement[StatementT]](options: CassandraQueryOptions, statement: StatementT): StatementT = {
+    val stmt1: StatementT = if (options.consistency.isDefined) statement.setConsistencyLevel(options.consistency.get) else statement
+    val stmt2: StatementT = if (options.serialConsistency.isDefined) stmt1.setSerialConsistencyLevel(options.serialConsistency.get) else stmt1
+    val stmt3: StatementT = if (options.timestamp.isDefined) stmt2.setQueryTimestamp(options.timestamp.get) else stmt2
+    val stmt4: StatementT = if (options.fetchSize.isDefined) stmt3.setPageSize(options.fetchSize.get) else stmt3
+    val stmt5: StatementT = if (options.requestTimeOut.isDefined) stmt4.setTimeout(Duration.ofSeconds(options.requestTimeOut.get)) else stmt4
+    stmt5
   }
 
-  private def createBoundStatement(codecRegistry: CodecRegistry, name: String, ps: PreparedStatement, rawBoundValues: String): BoundStatement = {
-    val dataTypes = ps.getVariables.toList
-      .map(cfDef => cfDef.getType)
+  private def createBoundStatement(codecRegistry: CodecRegistry, name: String, ps: PreparedStatement,
+                                   rawBoundValues: String): BoundStatement = {
+    val dataTypes = ps.getVariableDefinitions.iterator.asScala.toSeq.map(cfDef => cfDef.getType)
 
     val boundValuesAsText = parseBoundValues(name,rawBoundValues)
 
@@ -383,15 +375,15 @@ class InterpreterLogic(val session: Session)  {
 
     val convertedValues: List[AnyRef] = boundValuesAsText
       .zip(dataTypes).map {
-        case (value, dataType) => {
+        case (value, dataType) =>
           if(value.trim == "null") {
             null
           } else {
             val codec: TypeCodec[AnyRef] = codecRegistry.codecFor[AnyRef](dataType)
-            dataType.getName match {
-            case (ASCII | TEXT | VARCHAR) => value.trim.replaceAll("(?<!')'","")
-            case (INT | VARINT) => value.trim.toInt
-            case (BIGINT | COUNTER) => value.trim.toLong
+            dataType match {
+            case ASCII | TEXT  => value.trim.replaceAll("(?<!')'","")
+            case INT | VARINT => value.trim.toInt
+            case BIGINT | COUNTER => value.trim.toLong
             case BLOB => ByteBuffer.wrap(value.trim.getBytes)
             case BOOLEAN => value.trim.toBoolean
             case DECIMAL => BigDecimal(value.trim)
@@ -399,16 +391,15 @@ class InterpreterLogic(val session: Session)  {
             case FLOAT => value.trim.toFloat
             case INET => InetAddress.getByName(value.trim)
             case TIMESTAMP => parseDate(value.trim)
-            case (UUID | TIMEUUID) => java.util.UUID.fromString(value.trim)
-            case LIST => codec.parse(boundValuesParser.parse(boundValuesParser.list, value).get)
-            case SET => codec.parse(boundValuesParser.parse(boundValuesParser.set, value).get)
-            case MAP => codec.parse(boundValuesParser.parse(boundValuesParser.map, value).get)
-            case UDT => codec.parse(boundValuesParser.parse(boundValuesParser.udt, value).get)
-            case TUPLE => codec.parse(boundValuesParser.parse(boundValuesParser.tuple, value).get)
+            case UUID | TIMEUUID => java.util.UUID.fromString(value.trim)
+            case _: ListType => codec.parse(boundValuesParser.parse(boundValuesParser.list, value).get)
+            case _: SetType => codec.parse(boundValuesParser.parse(boundValuesParser.set, value).get)
+            case _: MapType => codec.parse(boundValuesParser.parse(boundValuesParser.map, value).get)
+            case _: UserDefinedType => codec.parse(boundValuesParser.parse(boundValuesParser.udt, value).get)
+            case _: TupleType => codec.parse(boundValuesParser.parse(boundValuesParser.tuple, value).get)
             case _ => throw new InterpreterException(s"Cannot parse data of type : ${dataType.toString}")
           }
         }
-      }
     }.asInstanceOf[List[AnyRef]]
 
     ps.bind(convertedValues.toArray: _*)
@@ -423,13 +414,15 @@ class InterpreterLogic(val session: Session)  {
     }
   }
 
-  def parseDate(dateString: String): Date = {
-    dateString match {
-      case boundValuesParser.STANDARD_DATE_PATTERN(datePattern) => new SimpleDateFormat(STANDARD_DATE_FORMAT).parse(datePattern)
-      case boundValuesParser.ACCURATE_DATE_PATTERN(datePattern) => new SimpleDateFormat(ACCURATE_DATE_FORMAT).parse(datePattern)
+  def parseDate(dateString: String): Instant = {
+    val formatter = dateString match {
+      case boundValuesParser.STANDARD_DATE_PATTERN(_) => STANDARD_DATE_FORMATTER
+      case boundValuesParser.ACCURATE_DATE_PATTERN(_) => ACCURATE_DATE_FORMATTER
       case _ => throw new InterpreterException(s"Cannot parse date '$dateString'. " +
         s"Accepted formats : $STANDARD_DATE_FORMAT OR $ACCURATE_DATE_FORMAT");
     }
+    // TODO(alex): check about timezone...
+    LocalDateTime.parse(dateString, formatter).toInstant(ZoneOffset.UTC)
   }
 
   def parseException(ex: Exception): String = {
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
index 865d89f..1834e7e 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
@@ -18,296 +18,130 @@ package org.apache.zeppelin.cassandra
 
 import java.lang.Boolean._
 
-import com.datastax.driver.core.HostDistance._
-import com.datastax.driver.core.ProtocolOptions.Compression
-import com.datastax.driver.core._
-import com.datastax.driver.core.policies._
+import com.datastax.oss.driver.api.core.ProtocolVersion
+import com.datastax.oss.driver.api.core.config.{DefaultDriverOption, ProgrammaticDriverConfigLoaderBuilder}
 import org.apache.commons.lang3.StringUtils._
 import org.apache.zeppelin.interpreter.Interpreter
 import org.apache.zeppelin.cassandra.CassandraInterpreter._
-import org.slf4j.LoggerFactory
+import org.slf4j.{Logger, LoggerFactory}
 
 /**
  * Utility class to extract and configure the Java driver
  */
 class JavaDriverConfig {
+  val LOGGER: Logger = LoggerFactory.getLogger(classOf[JavaDriverConfig])
 
-  val LOGGER = LoggerFactory.getLogger(classOf[JavaDriverConfig])
+  def setSocketOptions(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
+    val connectTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS,
+      CassandraInterpreter.DEFAULT_CONNECTION_TIMEOUT).toInt
+    configBuilder.withInt(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, connectTimeoutMillis)
+    configBuilder.withInt(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, connectTimeoutMillis)
 
-  def getSocketOptions(intpr: Interpreter): SocketOptions = {
-    val socketOptions: SocketOptions = new SocketOptions
-    val socketOptionsInfo: StringBuilder = new StringBuilder("Socket options : \n\n")
+    val readTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS,
+      CassandraInterpreter.DEFAULT_READ_TIMEOUT).toInt
+    configBuilder.withInt(DefaultDriverOption.REQUEST_TIMEOUT, readTimeoutMillis)
 
-    val connectTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS).toInt
-    socketOptions.setConnectTimeoutMillis(connectTimeoutMillis)
-    socketOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS)
-      .append(" : ")
-      .append(connectTimeoutMillis).append("\n")
-
-    val readTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS).toInt
-    socketOptions.setReadTimeoutMillis(readTimeoutMillis)
-    socketOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS)
-      .append(" : ")
-      .append(readTimeoutMillis).append("\n")
-
-    val tcpNoDelay: Boolean = parseBoolean(intpr.getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY))
-    socketOptions.setTcpNoDelay(tcpNoDelay)
-    socketOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_SOCKET_TCP_NO_DELAY)
-      .append(" : ")
-      .append(tcpNoDelay)
-      .append("\n")
+    val tcpNoDelay = intpr.getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, CassandraInterpreter.DEFAULT_TCP_NO_DELAY)
+    if (isNotBlank(tcpNoDelay)) {
+      configBuilder.withBoolean(DefaultDriverOption.SOCKET_TCP_NODELAY, parseBoolean(tcpNoDelay))
+    }
 
     val keepAlive: String = intpr.getProperty(CASSANDRA_SOCKET_KEEP_ALIVE)
     if (isNotBlank(keepAlive)) {
-      val keepAliveValue: Boolean = parseBoolean(keepAlive)
-      socketOptions.setKeepAlive(keepAliveValue)
-      socketOptionsInfo
-        .append("\t")
-        .append(CASSANDRA_SOCKET_KEEP_ALIVE)
-        .append(" : ")
-        .append(keepAliveValue).append("\n")
+      configBuilder.withBoolean(DefaultDriverOption.SOCKET_KEEP_ALIVE, parseBoolean(keepAlive))
     }
 
     val receivedBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES)
     if (isNotBlank(receivedBuffSize)) {
-      val receiveBufferSizeValue: Int = receivedBuffSize.toInt
-      socketOptions.setReceiveBufferSize(receiveBufferSizeValue)
-      socketOptionsInfo
-        .append("\t")
-        .append(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES)
-        .append(" : ")
-        .append(receiveBufferSizeValue)
-        .append("\n")
+      configBuilder.withInt(DefaultDriverOption.SOCKET_RECEIVE_BUFFER_SIZE, receivedBuffSize.toInt)
     }
 
     val sendBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES)
     if (isNotBlank(sendBuffSize)) {
-      val sendBufferSizeValue: Int = sendBuffSize.toInt
-      socketOptions.setSendBufferSize(sendBufferSizeValue)
-      socketOptionsInfo
-        .append("\t")
-        .append(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES)
-        .append(" : ")
-        .append(sendBufferSizeValue)
-        .append("\n")
+      configBuilder.withInt(DefaultDriverOption.SOCKET_SEND_BUFFER_SIZE, sendBuffSize.toInt)
     }
 
     val reuseAddress: String = intpr.getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS)
     if (isNotBlank(reuseAddress)) {
-      val reuseAddressValue: Boolean = parseBoolean(reuseAddress)
-      socketOptions.setReuseAddress(reuseAddressValue)
-      socketOptionsInfo
-        .append("\t")
-        .append(CASSANDRA_SOCKET_REUSE_ADDRESS)
-        .append(" : ")
-        .append(reuseAddressValue)
-        .append("\n")
+      configBuilder.withBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, parseBoolean(reuseAddress))
     }
 
     val soLinger: String = intpr.getProperty(CASSANDRA_SOCKET_SO_LINGER)
     if (isNotBlank(soLinger)) {
-      val soLingerValue: Int = soLinger.toInt
-      socketOptions.setSoLinger(soLingerValue)
-      socketOptionsInfo
-        .append("\t")
-        .append(CASSANDRA_SOCKET_SO_LINGER)
-        .append(" : ")
-        .append(soLingerValue)
-        .append("\n")
+      configBuilder.withInt(DefaultDriverOption.SOCKET_LINGER_INTERVAL, soLinger.toInt)
     }
-
-    LOGGER.debug(socketOptionsInfo.append("\n").toString)
-
-    return socketOptions
   }
 
-  def getQueryOptions(intpr: Interpreter): QueryOptions = {
-    val queryOptions: QueryOptions = new QueryOptions
-    val queryOptionsInfo: StringBuilder = new StringBuilder("Query options : \n\n")
+  def setQueryOptions(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
+    val consistencyLevel = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY,
+      CassandraInterpreter.DEFAULT_CONSISTENCY)
+    configBuilder.withString(DefaultDriverOption.REQUEST_CONSISTENCY, consistencyLevel)
 
-    val consistencyLevel: ConsistencyLevel = ConsistencyLevel.valueOf(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY))
-    queryOptions.setConsistencyLevel(consistencyLevel)
-    queryOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_QUERY_DEFAULT_CONSISTENCY)
-      .append(" : ")
-      .append(consistencyLevel)
-      .append("\n")
+    val serialConsistencyLevel = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY,
+      CassandraInterpreter.DEFAULT_SERIAL_CONSISTENCY)
+    configBuilder.withString(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY, serialConsistencyLevel)
 
-    val serialConsistencyLevel: ConsistencyLevel = ConsistencyLevel.valueOf(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY))
-    queryOptions.setSerialConsistencyLevel(serialConsistencyLevel)
-    queryOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY)
-      .append(" : ")
-      .append(serialConsistencyLevel)
-      .append("\n")
+    val fetchSize = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE,
+      CassandraInterpreter.DEFAULT_FETCH_SIZE).toInt
+    configBuilder.withInt(DefaultDriverOption.REQUEST_PAGE_SIZE, fetchSize)
 
-    val fetchSize: Int = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE).toInt
-    queryOptions.setFetchSize(fetchSize)
-    queryOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE)
-      .append(" : ")
-      .append(fetchSize)
-      .append("\n")
-
-    val defaultIdempotence: Boolean = parseBoolean(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE))
-    queryOptions.setDefaultIdempotence(defaultIdempotence)
-    queryOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE)
-      .append(" : ")
-      .append(defaultIdempotence)
-      .append("\n")
-
-    LOGGER.debug(queryOptionsInfo.append("\n").toString)
-
-    return queryOptions
+    configBuilder.withBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE,
+      parseBoolean(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE)))
   }
 
-  def getProtocolVersion(intpr: Interpreter): ProtocolVersion = {
-    val protocolVersion: String = intpr.getProperty(CASSANDRA_PROTOCOL_VERSION)
+  val PROTOCOL_MAPPING: Map[String, ProtocolVersion] = Map("3" -> ProtocolVersion.V3, "4" -> ProtocolVersion.V4,
+    "5" -> ProtocolVersion.V5, "DSE1" -> ProtocolVersion.DSE_V1, "DSE2" -> ProtocolVersion.DSE_V2)
+
+  def setProtocolVersion(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
+    val protocolVersion: String = intpr.getProperty(CASSANDRA_PROTOCOL_VERSION,
+      CassandraInterpreter.DEFAULT_PROTOCOL_VERSION)
     LOGGER.debug("Protocol version : " + protocolVersion)
 
     protocolVersion match {
-      case "1" =>
-        defaultMaxConnectionPerHostLocal = "8"
-        defaultMaxConnectionPerHostRemote = "2"
-        defaultCoreConnectionPerHostLocal = "2"
-        defaultCoreConnectionPerHostRemote = "1"
-        defaultNewConnectionThresholdLocal = "100"
-        defaultNewConnectionThresholdRemote = "1"
-        defaultMaxRequestPerConnectionLocal = "128"
-        defaultMaxRequestPerConnectionRemote = "128"
-        return ProtocolVersion.V1
-      case "2" =>
-        defaultMaxConnectionPerHostLocal = "8"
-        defaultMaxConnectionPerHostRemote = "2"
-        defaultCoreConnectionPerHostLocal = "2"
-        defaultCoreConnectionPerHostRemote = "1"
-        defaultNewConnectionThresholdLocal = "100"
-        defaultNewConnectionThresholdRemote = "1"
-        defaultMaxRequestPerConnectionLocal = "128"
-        defaultMaxRequestPerConnectionRemote = "128"
-        return ProtocolVersion.V2
-      case "3" =>
-        defaultMaxConnectionPerHostLocal = "1"
-        defaultMaxConnectionPerHostRemote = "1"
-        defaultCoreConnectionPerHostLocal = "1"
-        defaultCoreConnectionPerHostRemote = "1"
-        defaultNewConnectionThresholdLocal = "800"
-        defaultNewConnectionThresholdRemote = "200"
-        defaultMaxRequestPerConnectionLocal = "1024"
-        defaultMaxRequestPerConnectionRemote = "256"
-        return ProtocolVersion.V3
-      case "4" =>
-        defaultMaxConnectionPerHostLocal = "1"
-        defaultMaxConnectionPerHostRemote = "1"
-        defaultCoreConnectionPerHostLocal = "1"
-        defaultCoreConnectionPerHostRemote = "1"
-        defaultNewConnectionThresholdLocal = "800"
-        defaultNewConnectionThresholdRemote = "200"
-        defaultMaxRequestPerConnectionLocal = "1024"
-        defaultMaxRequestPerConnectionRemote = "256"
-        return ProtocolVersion.V4
+      case "1" | "2" =>
+        throw new RuntimeException(s"Protocol V${protocolVersion} isn't supported")
       case _ =>
-        defaultMaxConnectionPerHostLocal = "1"
-        defaultMaxConnectionPerHostRemote = "1"
-        defaultCoreConnectionPerHostLocal = "1"
-        defaultCoreConnectionPerHostRemote = "1"
-        defaultNewConnectionThresholdLocal = "800"
-        defaultNewConnectionThresholdRemote = "200"
-        defaultMaxRequestPerConnectionLocal = "1024"
-        defaultMaxRequestPerConnectionRemote = "256"
-        return ProtocolVersion.NEWEST_SUPPORTED
+        configBuilder.withString(DefaultDriverOption.PROTOCOL_VERSION,
+          PROTOCOL_MAPPING.getOrElse(protocolVersion, ProtocolVersion.DEFAULT).name())
     }
   }
 
-  def getPoolingOptions(intpr: Interpreter): PoolingOptions = {
-    val poolingOptions: PoolingOptions = new PoolingOptions
+  def setPoolingOptions(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
     val poolingOptionsInfo: StringBuilder = new StringBuilder("Pooling options : \n\n")
 
-    val maxConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL).toInt
-    poolingOptions.setMaxConnectionsPerHost(LOCAL, maxConnPerHostLocal)
-    poolingOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL)
-      .append(" : ")
-      .append(maxConnPerHostLocal)
-      .append("\n")
-
-    val maxConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE).toInt
-    poolingOptions.setMaxConnectionsPerHost(REMOTE, maxConnPerHostRemote)
-    poolingOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE)
-      .append(" : ")
-      .append(maxConnPerHostRemote)
-      .append("\n")
-
-    val coreConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL).toInt
-    poolingOptions.setCoreConnectionsPerHost(LOCAL, coreConnPerHostLocal)
+    val coreConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL,
+      DEFAULT_CONNECTIONS_PER_HOST).toInt
+    configBuilder.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, coreConnPerHostLocal)
     poolingOptionsInfo
       .append("\t")
-      .append(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL)
+      .append(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL)
       .append(" : ")
       .append(coreConnPerHostLocal)
       .append("\n")
 
-    val coreConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE).toInt
-    poolingOptions.setCoreConnectionsPerHost(REMOTE, coreConnPerHostRemote)
+    val coreConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE,
+      DEFAULT_CONNECTIONS_PER_HOST).toInt
+    configBuilder.withInt(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, coreConnPerHostRemote)
     poolingOptionsInfo
       .append("\t")
-      .append(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE)
+      .append(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE)
       .append(" : ")
       .append(coreConnPerHostRemote)
       .append("\n")
 
-    val newConnThresholdLocal: Int = intpr.getProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL).toInt
-    poolingOptions.setNewConnectionThreshold(LOCAL, newConnThresholdLocal)
-    poolingOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL)
-      .append(" : ")
-      .append(newConnThresholdLocal)
-      .append("\n")
-
-    val newConnThresholdRemote: Int = intpr.getProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE).toInt
-    poolingOptions.setNewConnectionThreshold(REMOTE, newConnThresholdRemote)
-    poolingOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE)
-      .append(" : ")
-      .append(newConnThresholdRemote)
-      .append("\n")
-
-    val maxReqPerConnLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL).toInt
-    poolingOptions.setMaxRequestsPerConnection(LOCAL, maxReqPerConnLocal)
+    val maxReqPerConnLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION,
+      DEFAULT_MAX_REQUEST_PER_CONNECTION).toInt
+    configBuilder.withInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS, maxReqPerConnLocal)
     poolingOptionsInfo
       .append("\t")
-      .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL)
+      .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION)
       .append(" : ")
       .append(maxReqPerConnLocal)
       .append("\n")
 
-    val maxReqPerConnRemote: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE).toInt
-    poolingOptions.setMaxRequestsPerConnection(REMOTE, maxReqPerConnRemote)
-    poolingOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE)
-      .append(" : ")
-      .append(maxReqPerConnRemote)
-      .append("\n")
-
-    val heartbeatIntervalSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS).toInt
-    poolingOptions.setHeartbeatIntervalSeconds(heartbeatIntervalSeconds)
+    val heartbeatIntervalSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS,
+      DEFAULT_HEARTBEAT_INTERVAL).toInt
+    configBuilder.withInt(DefaultDriverOption.HEARTBEAT_INTERVAL, heartbeatIntervalSeconds)
     poolingOptionsInfo
       .append("\t")
       .append(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS)
@@ -315,126 +149,59 @@ class JavaDriverConfig {
       .append(heartbeatIntervalSeconds)
       .append("\n")
 
-    val idleTimeoutSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS).toInt
-    poolingOptions.setIdleTimeoutSeconds(idleTimeoutSeconds)
-    poolingOptionsInfo
-      .append("\t")
-      .append(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS)
-      .append(" : ")
-      .append(idleTimeoutSeconds)
-      .append("\n")
-
-    val poolTimeoutMillis: Int = intpr.getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS).toInt
-    poolingOptions.setPoolTimeoutMillis(poolTimeoutMillis)
+    val idleTimeoutSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS,
+      DEFAULT_POOL_TIMEOUT).toInt
+    configBuilder.withInt(DefaultDriverOption.HEARTBEAT_TIMEOUT, idleTimeoutSeconds)
     poolingOptionsInfo
       .append("\t")
       .append(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS)
       .append(" : ")
-      .append(poolTimeoutMillis)
+      .append(idleTimeoutSeconds)
       .append("\n")
 
     LOGGER.debug(poolingOptionsInfo.append("\n").toString)
-
-    return poolingOptions
   }
 
-  def getCompressionProtocol(intpr: Interpreter): ProtocolOptions.Compression = {
-    var compression: ProtocolOptions.Compression = null
-    val compressionProtocol: String = intpr.getProperty(CASSANDRA_COMPRESSION_PROTOCOL)
-
+  def setCompressionProtocol(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
+    val compressionProtocol = intpr.getProperty(CASSANDRA_COMPRESSION_PROTOCOL,
+      CassandraInterpreter.DEFAULT_COMPRESSION).toLowerCase
     LOGGER.debug("Compression protocol : " + compressionProtocol)
 
-    if (compressionProtocol == null) "NONE"
-    else compressionProtocol.toUpperCase match {
-      case "NONE" =>
-        compression = Compression.NONE
-      case "SNAPPY" =>
-        compression = Compression.SNAPPY
-      case "LZ4" =>
-        compression = Compression.LZ4
-      case _ =>
-        compression = Compression.NONE
+    compressionProtocol match {
+      case "snappy" | "lz4" =>
+        configBuilder.withString(DefaultDriverOption.PROTOCOL_COMPRESSION, compressionProtocol)
+      case _ => ()
     }
-    return compression
   }
 
-  def getLoadBalancingPolicy(intpr: Interpreter): LoadBalancingPolicy = {
-    val loadBalancingPolicy: String = intpr.getProperty(CASSANDRA_LOAD_BALANCING_POLICY)
-    LOGGER.debug("Load Balancing Policy : " + loadBalancingPolicy)
-
-    if (isBlank(loadBalancingPolicy) || (DEFAULT_POLICY == loadBalancingPolicy)) {
-      return Policies.defaultLoadBalancingPolicy
-    }
-    else {
-      try {
-        return (Class.forName(loadBalancingPolicy).asInstanceOf[Class[LoadBalancingPolicy]]).newInstance
-      }
-      catch {
-        case e: Any => {
-          e.printStackTrace
-          throw new RuntimeException("Cannot instantiate " + CASSANDRA_LOAD_BALANCING_POLICY + " = " + loadBalancingPolicy)
-        }
-      }
-    }
+  private def isNotDefaultParameter(param: String) = {
+    !(isBlank(param) || DEFAULT_POLICY == param)
   }
 
-  def getRetryPolicy(intpr: Interpreter): RetryPolicy = {
+  def setRetryPolicy(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
     val retryPolicy: String = intpr.getProperty(CASSANDRA_RETRY_POLICY)
     LOGGER.debug("Retry Policy : " + retryPolicy)
 
-    if (isBlank(retryPolicy) || (DEFAULT_POLICY == retryPolicy)) {
-      return Policies.defaultRetryPolicy
-    }
-    else {
-      try {
-        return (Class.forName(retryPolicy).asInstanceOf[Class[RetryPolicy]]).newInstance
-      }
-      catch {
-        case e: Any => {
-          e.printStackTrace
-          throw new RuntimeException("Cannot instantiate " + CASSANDRA_RETRY_POLICY + " = " + retryPolicy)
-        }
-      }
+    if (isNotDefaultParameter(retryPolicy)) {
+      configBuilder.withString(DefaultDriverOption.RETRY_POLICY_CLASS, retryPolicy)
     }
   }
 
-  def getReconnectionPolicy(intpr: Interpreter): ReconnectionPolicy = {
+  def setReconnectionPolicy(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
     val reconnectionPolicy: String = intpr.getProperty(CASSANDRA_RECONNECTION_POLICY)
     LOGGER.debug("Reconnection Policy : " + reconnectionPolicy)
 
-    if (isBlank(reconnectionPolicy) || (DEFAULT_POLICY == reconnectionPolicy)) {
-      return Policies.defaultReconnectionPolicy
-    }
-    else {
-      try {
-        return (Class.forName(reconnectionPolicy).asInstanceOf[Class[ReconnectionPolicy]]).newInstance
-      }
-      catch {
-        case e: Any => {
-          e.printStackTrace
-          throw new RuntimeException("Cannot instantiate " + CASSANDRA_RECONNECTION_POLICY + " = " + reconnectionPolicy)
-        }
-      }
+    if (isNotDefaultParameter(reconnectionPolicy)) {
+      configBuilder.withString(DefaultDriverOption.RECONNECTION_POLICY_CLASS, reconnectionPolicy)
     }
   }
 
-  def getSpeculativeExecutionPolicy(intpr: Interpreter): SpeculativeExecutionPolicy = {
+  def setSpeculativeExecutionPolicy(intpr: Interpreter, configBuilder: ProgrammaticDriverConfigLoaderBuilder): Unit = {
     val specExecPolicy: String = intpr.getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY)
     LOGGER.debug("Speculative Execution Policy : " + specExecPolicy)
 
-    if (isBlank(specExecPolicy) || (DEFAULT_POLICY == specExecPolicy)) {
-      return Policies.defaultSpeculativeExecutionPolicy
+    if (isNotDefaultParameter(specExecPolicy)) {
+      configBuilder.withString(DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS, specExecPolicy)
     }
-    else {
-      try {
-        return (Class.forName(specExecPolicy).asInstanceOf[Class[SpeculativeExecutionPolicy]]).newInstance
-      }
-      catch {
-        case e: Any => {
-          e.printStackTrace
-          throw new RuntimeException("Cannot instantiate " + CASSANDRA_SPECULATIVE_EXECUTION_POLICY + " = " + specExecPolicy)
-        }
-      }
-    }
-  }  
+  }
 }
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
index 4b88776..1f6a814 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
@@ -18,8 +18,8 @@ package org.apache.zeppelin.cassandra
 
 import java.util.UUID
 
-import com.datastax.driver.core.utils.UUIDs
-import com.datastax.driver.core.{DataType, TableMetadata}
+import com.datastax.oss.driver.api.core.`type`.DataType
+import com.datastax.oss.driver.api.core.uuid.Uuids
 
 import scala.util.parsing.json.JSONObject
 
@@ -28,15 +28,14 @@ import scala.util.parsing.json.JSONObject
  */
 object MetaDataHierarchy {
   object OrderConverter {
-    def convert(clusteringOrder: com.datastax.driver.core.ClusteringOrder): ClusteringOrder = {
+    def convert(clusteringOrder: com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder): ClusteringOrder = {
       clusteringOrder match {
-        case com.datastax.driver.core.ClusteringOrder.ASC => ASC
-        case com.datastax.driver.core.ClusteringOrder.DESC => DESC
+        case com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder.ASC => ASC
+        case com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder.DESC => DESC
       }
     }
   }
 
-
   sealed trait ClusteringOrder
   object ASC extends ClusteringOrder
   object DESC extends ClusteringOrder
@@ -50,33 +49,36 @@ object MetaDataHierarchy {
   case class ColumnDetails(name: String, columnType: ColumnType, dataType: DataType)
 
   case class ClusterDetails(name: String, partitioner: String)
-  case class ClusterContent(clusterName: String, clusterDetails: String, keyspaces: List[(UUID, String, String)])
-  case class AllTables(tables: Map[String,List[String]])
-  case class KeyspaceDetails(name: String, replication: Map[String,String], durableWrites: Boolean, asCQL: String, uniqueId: UUID = UUIDs.timeBased()) {
+  case class ClusterContent(clusterName: String, clusterDetails: String, keyspaces: Seq[(UUID, String, String)])
+  case class AllTables(tables: Map[String, Seq[String]])
+  case class KeyspaceDetails(name: String, replication: Map[String,String], durableWrites: Boolean,
+                             asCQL: String, uniqueId: UUID = Uuids.timeBased()) {
     def getReplicationMap: String = {
       JSONObject(replication).toString().replaceAll(""""""","'")
     }
   }
   case class KeyspaceContent(keyspaceName: String, keyspaceDetails: String,
-                             tables: List[(UUID,String, String)],
-                             views: List[(UUID,String, String)],
-                             udts: List[(UUID, String, String)],
-                             functions: List[(UUID, String, String)],
-                             aggregates: List[(UUID, String, String)])
-  case class TableDetails(tableName: String, columns: List[ColumnDetails], indices: List[IndexDetails], asCQL: String, indicesAsCQL: String, uniqueId: UUID = UUIDs.timeBased())
-  case class UDTDetails(typeName: String, columns: List[ColumnDetails], asCQL: String, uniqueId: UUID = UUIDs.timeBased())
+                             tables: Seq[(UUID,String, String)],
+                             views: Seq[(UUID,String, String)],
+                             udts: Seq[(UUID, String, String)],
+                             functions: Seq[(UUID, String, String)],
+                             aggregates: Seq[(UUID, String, String)])
+  case class TableDetails(tableName: String, columns: Seq[ColumnDetails], indices: Seq[IndexDetails], asCQL: String,
+                          indicesAsCQL: String, uniqueId: UUID = Uuids.timeBased())
+  case class UDTDetails(typeName: String, columns: Seq[ColumnDetails], asCQL: String, uniqueId: UUID = Uuids.timeBased())
 
-  case class SameNameFunctionDetails(functions: List[FunctionDetails])
-  case class FunctionDetails(keyspace:String, name: String, arguments: List[String], calledOnNullInput: Boolean, returnType: String,
-    language:String, body: String, asCQL: String, uniqueId: UUID = UUIDs.timeBased())
-  case class FunctionSummary(keyspace:String, name: String, arguments: List[String], returnType: String)
+  case class SameNameFunctionDetails(functions: Seq[FunctionDetails])
+  case class FunctionDetails(keyspace:String, name: String, arguments: Seq[String], calledOnNullInput: Boolean, returnType: String,
+    language:String, body: String, asCQL: String, uniqueId: UUID = Uuids.timeBased())
+  case class FunctionSummary(keyspace:String, name: String, arguments: Seq[String], returnType: String)
 
-  case class AggregateDetails(keyspace:String, name: String, arguments: List[String], sFunc: String, sType: String,
+  case class AggregateDetails(keyspace:String, name: String, arguments: Seq[String], sFunc: String, sType: String,
     finalFunc: Option[String], initCond: Option[String], returnType: String,
-    asCQL: String, uniqueId: UUID = UUIDs.timeBased())
-  case class AggregateSummary(keyspace:String, name: String, arguments: List[String], returnType: String)
-  case class SameNameAggregateDetails(aggregates: List[AggregateDetails])
+    asCQL: String, uniqueId: UUID = Uuids.timeBased())
+  case class AggregateSummary(keyspace:String, name: String, arguments: Seq[String], returnType: String)
+  case class SameNameAggregateDetails(aggregates: Seq[AggregateDetails])
 
-  case class MaterializedViewDetails(name: String, columns: List[ColumnDetails], asCQL: String, baseTable: String, uniqueId: UUID = UUIDs.timeBased())
+  case class MaterializedViewDetails(name: String, columns: Seq[ColumnDetails], asCQL: String, baseTable: String,
+                                     uniqueId: UUID = Uuids.timeBased())
   case class MaterializedViewSummary(name: String, baseTable: String)
 }
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
index 2c198ca..514161e 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
@@ -16,9 +16,11 @@
  */
 package org.apache.zeppelin.cassandra
 
-import com.datastax.driver.core._
+import com.datastax.oss.driver.api.core.{ConsistencyLevel, DefaultConsistencyLevel}
+import com.datastax.oss.driver.api.core.cql.{BatchType, DefaultBatchType}
 import org.apache.zeppelin.cassandra.CassandraInterpreter._
 import org.apache.zeppelin.interpreter.InterpreterException
+
 import scala.util.matching.Regex
 import scala.util.parsing.combinator._
 import org.apache.zeppelin.cassandra.TextBlockHierarchy._
@@ -33,25 +35,24 @@ import org.apache.zeppelin.cassandra.TextBlockHierarchy._
   */
 object ParagraphParser {
 
-  val CONSISTENCY_LEVEL_PATTERN = ConsistencyLevel.values().toList
-    .map(_.name()).filter(!_.contains("SERIAL")).mkString("""^\s*@consistency\s*=\s*(""", "|" , """)\s*$""").r
+  val allConsistencyLevels: Seq[ConsistencyLevel] = DefaultConsistencyLevel.values().toSeq
+
+  val CONSISTENCY_LEVEL_PATTERN: Regex = allConsistencyLevels.filter(!_.isSerial).map(_.name)
+    .mkString("""^\s*@consistency\s*=\s*(""", "|" , """)\s*$""").r
 
-  val SERIAL_CONSISTENCY_LEVEL_PATTERN = ConsistencyLevel.values().toList
-    .map(_.name()).filter(_.contains("SERIAL")).mkString("""^\s*@serialConsistency\s*=\s*(""", "|", """)\s*$""").r
-  val TIMESTAMP_PATTERN = """^\s*@timestamp\s*=\s*([0-9]+)\s*$""".r
+  val SERIAL_CONSISTENCY_LEVEL_PATTERN: Regex = allConsistencyLevels.filter(_.isSerial).map(_.name)
+  .mkString("""^\s*@serialConsistency\s*=\s*(""", "|", """)\s*$""").r
+  val TIMESTAMP_PATTERN: Regex = """^\s*@timestamp\s*=\s*([0-9]+)\s*$""".r
 
-  val RETRY_POLICIES_PATTERN = List(DEFAULT_POLICY,DOWNGRADING_CONSISTENCY_RETRY, FALLTHROUGH_RETRY,
-    LOGGING_DEFAULT_RETRY, LOGGING_DOWNGRADING_RETRY, LOGGING_FALLTHROUGH_RETRY)
-    .mkString("""^\s*@retryPolicy\s*=\s*(""", "|" , """)\s*$""").r
-  val FETCHSIZE_PATTERN = """^\s*@fetchSize\s*=\s*([0-9]+)\s*$""".r
-  val REQUEST_TIMEOUT_PATTERN = """^\s*@requestTimeOut\s*=\s*([0-9]+)\s*$""".r
+  val FETCHSIZE_PATTERN: Regex = """^\s*@fetchSize\s*=\s*([0-9]+)\s*$""".r
+  val REQUEST_TIMEOUT_PATTERN: Regex = """^\s*@requestTimeOut\s*=\s*([0-9]+)\s*$""".r
 
-  val SIMPLE_STATEMENT_PATTERN = """([^;]+;)""".r
-  val PREPARE_STATEMENT_PATTERN = """^\s*@prepare\[([^]]+)\]\s*=\s*([^;]+)$""".r
-  val REMOVE_PREPARE_STATEMENT_PATTERN = """^\s*@remove_prepare\[([^]]+)\]\s*$""".r
+  val SIMPLE_STATEMENT_PATTERN: Regex = """([^;]+;)""".r
+  val PREPARE_STATEMENT_PATTERN: Regex = """^\s*@prepare\[([^]]+)\]\s*=\s*([^;]+)$""".r
+  val REMOVE_PREPARE_STATEMENT_PATTERN: Regex = """^\s*@remove_prepare\[([^]]+)\]\s*$""".r
 
-  val BIND_PATTERN = """^\s*@bind\[([^]]+)\](?:=([^;]+))?""".r
-  val BATCH_PATTERN = """^(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r
+  val BIND_PATTERN: Regex = """^\s*@bind\[([^]]+)\](?:=([^;]+))?""".r
+  val BATCH_PATTERN: Regex = """^(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r
 
   /**
     * Very complicated RegExp
@@ -67,131 +68,129 @@ object ParagraphParser {
     *                                followed by optional white-space(s) (\s*)
     *                                followed by semi-colon (;)
     */
-  val UDF_PATTERN = """(?is)\s*(CREATE(?:\s+OR REPLACE)?\s+FUNCTION(?:\s+IF\s+NOT\s+EXISTS)?.+?(?:\s+|\n|\r|\f)AS(?:\s+|\n|\r|\f)(?:'|\$\$).+?(?:'|\$\$)\s*;)""".r
+  val UDF_PATTERN: Regex = """(?is)\s*(CREATE(?:\s+OR REPLACE)?\s+FUNCTION(?:\s+IF\s+NOT\s+EXISTS)?.+?(?:\s+|\n|\r|\f)AS(?:\s+|\n|\r|\f)(?:'|\$\$).+?(?:'|\$\$)\s*;)""".r
 
-  val GENERIC_STATEMENT_PREFIX =
+  val GENERIC_STATEMENT_PREFIX: Regex =
     """(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|ALTER|
       |DROP|GRANT|REVOKE|TRUNCATE|LIST|USE)\s+""".r
 
   val VALID_IDENTIFIER = "[a-z][a-z0-9_]*"
 
-  val DESCRIBE_CLUSTER_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER\s*;\s*$""".r
+  val DESCRIBE_CLUSTER_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER\s*;\s*$""".r
 
 
-  val DESCRIBE_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
-  val DESCRIBE_KEYSPACES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES\s*;\s*$""".r
+  val DESCRIBE_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+  val DESCRIBE_KEYSPACES_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES\s*;\s*$""".r
 
 
-  val DESCRIBE_TABLE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
-  val DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*(""" +
+  val DESCRIBE_TABLE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+  val DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*(""" +
                                                 VALID_IDENTIFIER +
                                                 """)\.(""" +
                                                 VALID_IDENTIFIER +
                                                 """);\s*$""").r
-  val DESCRIBE_TABLES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+TABLES\s*;\s*$""".r
+  val DESCRIBE_TABLES_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+TABLES\s*;\s*$""".r
 
 
-  val DESCRIBE_TYPE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
-  val DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*(""" +
+  val DESCRIBE_TYPE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+  val DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*(""" +
                                                 VALID_IDENTIFIER +
                                                 """)\.(""" +
                                                 VALID_IDENTIFIER +
                                                 """);\s*$""").r
-  val DESCRIBE_TYPES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+TYPES\s*;\s*$""".r
+  val DESCRIBE_TYPES_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+TYPES\s*;\s*$""".r
 
 
-  val DESCRIBE_FUNCTION_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s*("""+VALID_IDENTIFIER+""");\s*$""").r
-  val DESCRIBE_FUNCTION_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s*(""" +
+  val DESCRIBE_FUNCTION_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+  val DESCRIBE_FUNCTION_WITH_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s*(""" +
                                                   VALID_IDENTIFIER +
                                                   """)\.(""" +
                                                   VALID_IDENTIFIER +
                                                   """);\s*$""").r
-  val DESCRIBE_FUNCTIONS_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTIONS\s*;\s*$""").r
+  val DESCRIBE_FUNCTIONS_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTIONS\s*;\s*$""".r
 
 
-  val DESCRIBE_AGGREGATE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
-  val DESCRIBE_AGGREGATE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s*(""" +
+  val DESCRIBE_AGGREGATE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+  val DESCRIBE_AGGREGATE_WITH_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s*(""" +
                                                     VALID_IDENTIFIER +
                                                     """)\.(""" +
                                                     VALID_IDENTIFIER +
                                                     """);\s*$""").r
-  val DESCRIBE_AGGREGATES_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATES\s*;\s*$""").r
+  val DESCRIBE_AGGREGATES_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATES\s*;\s*$""".r
 
 
-  val DESCRIBE_MATERIALIZED_VIEW_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s*("""+VALID_IDENTIFIER+""")\s*;\s*$""").r
-  val DESCRIBE_MATERIALIZED_VIEW_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s*(""" +
+  val DESCRIBE_MATERIALIZED_VIEW_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s*("""+VALID_IDENTIFIER+""")\s*;\s*$""").r
+  val DESCRIBE_MATERIALIZED_VIEW_WITH_KEYSPACE_PATTERN: Regex = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s*(""" +
                                                             VALID_IDENTIFIER +
                                                             """)\.(""" +
                                                             VALID_IDENTIFIER +
                                                             """)\s*;\s*$""").r
-  val DESCRIBE_MATERIALIZED_VIEWS_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEWS\s*;\s*$""").r
-
+  val DESCRIBE_MATERIALIZED_VIEWS_PATTERN: Regex = """^(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEWS\s*;\s*$""".r
 
-  val HELP_PATTERN = """^(?i)\s*HELP;\s*$""".r
+  val HELP_PATTERN: Regex = """^(?i)\s*HELP;\s*$""".r
 }
 
 class ParagraphParser extends RegexParsers{
 
-
   import ParagraphParser._
 
-  def singleLineCommentHash: Parser[Comment] = """\s*#.*""".r ^^ {case text => Comment(text.trim.replaceAll("#",""))}
-  def singleLineCommentDoubleSlashes: Parser[Comment] = """\s*//.*""".r ^^ {case text => Comment(text.trim.replaceFirst("//\\s*",""))}
-  def singleLineCommentDoubleDash: Parser[Comment] = """\s*--.*""".r ^^ {case text => Comment(text.trim.replaceFirst("//\\s*",""))}
+  def singleLineCommentHash: Parser[Comment] = """\s*#.*""".r ^^ {text => Comment(text.trim.replaceAll("#",""))}
+  def singleLineCommentDoubleSlashes: Parser[Comment] = """\s*//.*""".r ^^ {text => Comment(text.trim.replaceFirst("//\\s*",""))}
+  def singleLineCommentDoubleDash: Parser[Comment] = """\s*--.*""".r ^^ {text => Comment(text.trim.replaceFirst("//\\s*",""))}
   def singleLineComment: Parser[Comment] = singleLineCommentHash | singleLineCommentDoubleSlashes | singleLineCommentDoubleDash
 
-  def multiLineComment: Parser[Comment] = """(?s)/\*(.*)\*/""".r ^^ {case text => Comment(text.trim.replaceAll("""/\*""","").replaceAll("""\*/""",""))}
+  def multiLineComment: Parser[Comment] = """(?s)/\*(.*)\*/""".r ^^ {text => Comment(text.trim.replaceAll("""/\*""","").replaceAll("""\*/""",""))}
 
   //Query parameters
-  def consistency: Parser[Consistency] = """\s*@consistency.+""".r ^^ {case x => extractConsistency(x.trim)}
-  def serialConsistency: Parser[SerialConsistency] = """\s*@serialConsistency.+""".r ^^ {case x => extractSerialConsistency(x.trim)}
-  def timestamp: Parser[Timestamp] = """\s*@timestamp.+""".r ^^ {case x => extractTimestamp(x.trim)}
-  def retryPolicy: Parser[RetryPolicy] = """\s*@retryPolicy.+""".r ^^ {case x => extractRetryPolicy(x.trim)}
-  def fetchSize: Parser[FetchSize] = """\s*@fetchSize.+""".r ^^ {case x => extractFetchSize(x.trim)}
-  def requestTimeOut: Parser[RequestTimeOut] = """\s*@requestTimeOut.+""".r ^^ {case x => extractRequestTimeOut(x.trim)}
+  def consistency: Parser[Consistency] = """\s*@consistency.+""".r ^^ {x => extractConsistency(x.trim)}
+  def serialConsistency: Parser[SerialConsistency] = """\s*@serialConsistency.+""".r ^^ {x => extractSerialConsistency(x.trim)}
+  def timestamp: Parser[Timestamp] = """\s*@timestamp.+""".r ^^ {x => extractTimestamp(x.trim)}
+  def fetchSize: Parser[FetchSize] = """\s*@fetchSize.+""".r ^^ {x => extractFetchSize(x.trim)}
+  def requestTimeOut: Parser[RequestTimeOut] = """\s*@requestTimeOut.+""".r ^^ {x => extractRequestTimeOut(x.trim)}
 
   //Statements
-  def createFunctionStatement: Parser[SimpleStm] = UDF_PATTERN ^^{case x => extractUdfStatement(x.trim)}
-  def genericStatement: Parser[SimpleStm] = s"""$GENERIC_STATEMENT_PREFIX[^;]+;""".r ^^ {case x => extractSimpleStatement(x.trim)}
+  def createFunctionStatement(): Parser[SimpleStm] = UDF_PATTERN ^^{x => extractUdfStatement(x.trim)}
+  def genericStatement(): Parser[SimpleStm] = s"""$GENERIC_STATEMENT_PREFIX[^;]+;""".r ^^ {x => extractSimpleStatement(x.trim)}
 //  def allStatement: Parser[SimpleStm] = udfStatement | genericStatement
 
-  def prepare: Parser[PrepareStm] = """\s*@prepare.+""".r ^^ {case x => extractPreparedStatement(x.trim)}
-  def removePrepare: Parser[RemovePrepareStm] = """\s*@remove_prepare.+""".r ^^ {case x => extractRemovePreparedStatement(x.trim)}
-  def bind: Parser[BoundStm] = """\s*@bind.+""".r ^^ {case x => extractBoundStatement(x.trim)}
+  def prepare(): Parser[PrepareStm] = """\s*@prepare.+""".r ^^ {x => extractPreparedStatement(x.trim)}
+  def removePrepare(): Parser[RemovePrepareStm] = """\s*@remove_prepare.+""".r ^^ {x => extractRemovePreparedStatement(x.trim)}
+  def bind(): Parser[BoundStm] = """\s*@bind.+""".r ^^ {x => extractBoundStatement(x.trim)}
 
 
   //Meta data
-  private def describeCluster: Parser[DescribeClusterCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER.*""".r ^^ {extractDescribeClusterCmd(_)}
-  private def describeKeyspaces: Parser[DescribeKeyspacesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES.*""".r ^^ {extractDescribeKeyspacesCmd(_)}
-  private def describeTables: Parser[DescribeTablesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLES.*""".r ^^ {extractDescribeTablesCmd(_)}
-  private def describeTypes: Parser[DescribeTypesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TYPES.*""".r ^^ {extractDescribeTypesCmd(_)}
-  private def describeFunctions: Parser[DescribeFunctionsCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTIONS.*""".r ^^ {extractDescribeFunctionsCmd(_)}
-  private def describeAggregates: Parser[DescribeAggregatesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATES.*""".r ^^ {extractDescribeAggregatesCmd(_)}
-  private def describeMaterializedViews: Parser[DescribeMaterializedViewsCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEWS.*""".r ^^ {extractDescribeMaterializedViewsCmd(_)}
-  private def describeKeyspace: Parser[DescribeKeyspaceCmd] = """\s*(?i)(?:DESCRIBE|DESC)\s+KEYSPACE\s+.+""".r ^^ {extractDescribeKeyspaceCmd(_)}
-  private def describeTable: Parser[DescribeTableCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s+.+""".r ^^ {extractDescribeTableCmd(_)}
-  private def describeType: Parser[DescribeTypeCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s+.*""".r ^^ {extractDescribeTypeCmd(_)}
-  private def describeFunction: Parser[DescribeFunctionCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s+.*""".r ^^ {extractDescribeFunctionCmd(_)}
-  private def describeAggregate: Parser[DescribeAggregateCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s+.*""".r ^^ {extractDescribeAggregateCmd(_)}
-  private def describeMaterializedView: Parser[DescribeMaterializedViewCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s+.*""".r ^^ {extractDescribeMaterializedViewCmd(_)}
+  private def describeCluster: Parser[DescribeClusterCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER.*""".r ^^ extractDescribeClusterCmd
+  private def describeKeyspaces: Parser[DescribeKeyspacesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES.*""".r ^^ extractDescribeKeyspacesCmd
+  private def describeTables: Parser[DescribeTablesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLES.*""".r ^^ extractDescribeTablesCmd
+  private def describeTypes: Parser[DescribeTypesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TYPES.*""".r ^^ extractDescribeTypesCmd
+  private def describeFunctions: Parser[DescribeFunctionsCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTIONS.*""".r ^^ extractDescribeFunctionsCmd
+  private def describeAggregates: Parser[DescribeAggregatesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATES.*""".r ^^ extractDescribeAggregatesCmd
+  private def describeMaterializedViews: Parser[DescribeMaterializedViewsCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEWS.*""".r ^^ extractDescribeMaterializedViewsCmd
+  private def describeKeyspace: Parser[DescribeKeyspaceCmd] = """\s*(?i)(?:DESCRIBE|DESC)\s+KEYSPACE\s+.+""".r ^^ extractDescribeKeyspaceCmd
+  private def describeTable: Parser[DescribeTableCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s+.+""".r ^^ extractDescribeTableCmd
+  private def describeType: Parser[DescribeTypeCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s+.*""".r ^^ extractDescribeTypeCmd
+  private def describeFunction: Parser[DescribeFunctionCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+FUNCTION\s+.*""".r ^^ extractDescribeFunctionCmd
+  private def describeAggregate: Parser[DescribeAggregateCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+AGGREGATE\s+.*""".r ^^ extractDescribeAggregateCmd
+  private def describeMaterializedView: Parser[DescribeMaterializedViewCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+MATERIALIZED\s+VIEW\s+.*""".r ^^ extractDescribeMaterializedViewCmd
 
 
   //Help
-  private def helpCommand: Parser[HelpCmd] = """(?i)\s*HELP.*""".r ^^{extractHelpCmd(_)}
+  private def helpCommand: Parser[HelpCmd] = """(?i)\s*HELP.*""".r ^^ extractHelpCmd
 
   private def beginBatch: Parser[String] = """(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r
   private def applyBatch: Parser[String] = """(?i)APPLY BATCH;""".r
-  private def insert: Parser[SimpleStm] = """(?i)INSERT [^;]+;""".r ^^{SimpleStm(_)}
-  private def update: Parser[SimpleStm] = """(?i)UPDATE [^;]+;""".r ^^{SimpleStm(_)}
-  private def delete: Parser[SimpleStm] = """(?i)DELETE [^;]+;""".r ^^{SimpleStm(_)}
+  private def insert: Parser[SimpleStm] = """(?i)INSERT [^;]+;""".r ^^ SimpleStm
+  private def update: Parser[SimpleStm] = """(?i)UPDATE [^;]+;""".r ^^ SimpleStm
+  private def delete: Parser[SimpleStm] = """(?i)DELETE [^;]+;""".r ^^ SimpleStm
 
   private def mutationStatements: Parser[List[QueryStatement]] = rep(insert | update | delete | bind)
 
   def batch: Parser[BatchStm] = beginBatch ~ mutationStatements ~ applyBatch ^^ {
-    case begin ~ cqls ~ end => BatchStm(extractBatchType(begin),cqls)}
+    case begin ~ cqls ~ end => BatchStm(extractBatchType(begin), cqls)
+  }
 
   def queries:Parser[List[AnyBlock]] = rep(singleLineComment | multiLineComment | consistency | serialConsistency |
-    timestamp | retryPolicy | fetchSize | requestTimeOut | removePrepare | prepare | bind | batch | describeCluster |
+    timestamp | fetchSize | requestTimeOut | removePrepare | prepare | bind | batch | describeCluster |
     describeKeyspace | describeKeyspaces |
     describeTable | describeTables |
     describeType | describeTypes |
@@ -202,7 +201,7 @@ class ParagraphParser extends RegexParsers{
 
   def extractConsistency(text: String): Consistency = {
     text match {
-      case CONSISTENCY_LEVEL_PATTERN(consistency) => Consistency(ConsistencyLevel.valueOf(consistency))
+      case CONSISTENCY_LEVEL_PATTERN(consistency) => Consistency(DefaultConsistencyLevel.valueOf(consistency))
       case _ => throw new InterpreterException(s"Invalid syntax for @consistency. " +
         s"It should comply to the pattern ${CONSISTENCY_LEVEL_PATTERN.toString}")
     }
@@ -210,7 +209,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractSerialConsistency(text: String): SerialConsistency = {
     text match {
-      case SERIAL_CONSISTENCY_LEVEL_PATTERN(consistency) => SerialConsistency(ConsistencyLevel.valueOf(consistency))
+      case SERIAL_CONSISTENCY_LEVEL_PATTERN(consistency) =>
+        SerialConsistency(DefaultConsistencyLevel.valueOf(consistency))
       case _ => throw new InterpreterException(s"Invalid syntax for @serialConsistency. " +
         s"It should comply to the pattern ${SERIAL_CONSISTENCY_LEVEL_PATTERN.toString}")
     }
@@ -224,24 +224,10 @@ class ParagraphParser extends RegexParsers{
     }
   }
 
-  def extractRetryPolicy(text: String): RetryPolicy = {
-    text match {
-      case RETRY_POLICIES_PATTERN(retry) => retry.trim match {
-        case DEFAULT_POLICY => DefaultRetryPolicy
-        case DOWNGRADING_CONSISTENCY_RETRY => DowngradingRetryPolicy
-        case FALLTHROUGH_RETRY => FallThroughRetryPolicy
-        case LOGGING_DEFAULT_RETRY => LoggingDefaultRetryPolicy
-        case LOGGING_DOWNGRADING_RETRY => LoggingDowngradingRetryPolicy
-        case LOGGING_FALLTHROUGH_RETRY => LoggingFallThroughRetryPolicy
-      }
-      case _ => throw new InterpreterException(s"Invalid syntax for @retryPolicy. " +
-        s"It should comply to the pattern ${RETRY_POLICIES_PATTERN.toString}")
-    }
-  }
-
   def extractFetchSize(text: String): FetchSize = {
     text match {
-      case FETCHSIZE_PATTERN(fetchSize) => FetchSize(fetchSize.trim.toInt)
+      case FETCHSIZE_PATTERN(fetchSize) =>
+        FetchSize(fetchSize.trim.toInt)
       case _ => throw new InterpreterException(s"Invalid syntax for @fetchSize. " +
         s"It should comply to the pattern ${FETCHSIZE_PATTERN.toString}")
     }
@@ -249,7 +235,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractRequestTimeOut(text: String): RequestTimeOut = {
     text match {
-      case REQUEST_TIMEOUT_PATTERN(requestTimeOut) => RequestTimeOut(requestTimeOut.trim.toInt)
+      case REQUEST_TIMEOUT_PATTERN(requestTimeOut) =>
+        RequestTimeOut(requestTimeOut.trim.toInt)
       case _ => throw new InterpreterException(s"Invalid syntax for @requestTimeOut. " +
         s"It should comply to the pattern ${REQUEST_TIMEOUT_PATTERN.toString}")
     }
@@ -257,21 +244,24 @@ class ParagraphParser extends RegexParsers{
 
   def extractSimpleStatement(text: String): SimpleStm = {
     text match {
-      case SIMPLE_STATEMENT_PATTERN(statement) => SimpleStm(statement)
+      case SIMPLE_STATEMENT_PATTERN(statement) =>
+        SimpleStm(statement)
       case _ => throw new InterpreterException(s"Invalid statement '$text'. Did you forget to add ; (semi-colon) at the end of each CQL statement ?")
     }
   }
 
   def extractUdfStatement(text: String): SimpleStm = {
     text match {
-      case UDF_PATTERN(statement) => SimpleStm(statement)
+      case UDF_PATTERN(statement) =>
+        SimpleStm(statement)
       case _ => throw new InterpreterException(s"Invalid statement '$text' for UDF creation. Did you forget to add ; (semi-colon) at the end of each CQL statement ?")
     }
   }
 
   def extractPreparedStatement(text: String): PrepareStm = {
     text match {
-      case PREPARE_STATEMENT_PATTERN(name,queryString) => PrepareStm(name.trim,queryString.trim)
+      case PREPARE_STATEMENT_PATTERN(name,queryString) =>
+        PrepareStm(name.trim,queryString.trim)
       case _ => throw new InterpreterException(s"Invalid syntax for @prepare. " +
         s"It should comply to the pattern: @prepare[prepared_statement_name]=CQL Statement (without semi-colon)")
     }
@@ -279,7 +269,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractRemovePreparedStatement(text: String): RemovePrepareStm= {
     text match {
-      case REMOVE_PREPARE_STATEMENT_PATTERN(name) => RemovePrepareStm(name.trim)
+      case REMOVE_PREPARE_STATEMENT_PATTERN(name) =>
+        RemovePrepareStm(name.trim)
       case _ => throw new InterpreterException(s"Invalid syntax for @remove_prepare. " +
         s"It should comply to the pattern: @remove_prepare[prepared_statement_name]")
     }
@@ -287,18 +278,19 @@ class ParagraphParser extends RegexParsers{
 
   def extractBoundStatement(text: String): BoundStm = {
     text match {
-      case BIND_PATTERN(name,boundValues) => BoundStm(name.trim, Option(boundValues).map(_.trim).getOrElse(""))
+      case BIND_PATTERN(name,boundValues) =>
+        BoundStm(name.trim, Option(boundValues).map(_.trim).getOrElse(""))
       case _ => throw new InterpreterException("Invalid syntax for @bind. It should comply to the pattern: " +
         "@bind[prepared_statement_name]=10,'jdoe','John DOE',12345,'2015-07-32 12:04:23.234' " +
         "OR @bind[prepared_statement_name] with no bound value. No semi-colon")
     }
   }
 
-  def extractBatchType(text: String): BatchStatement.Type = {
+  def extractBatchType(text: String): BatchType = {
     text match {
       case BATCH_PATTERN(batchType) =>
         val inferredType = Option(batchType).getOrElse("LOGGED")
-        BatchStatement.Type.valueOf(inferredType.toUpperCase)
+        DefaultBatchType.valueOf(inferredType.toUpperCase)
       case _ => throw new InterpreterException(s"Invalid syntax for BEGIN BATCH. " +
         s"""It should comply to the pattern: ${BATCH_PATTERN.toString}""")
     }
@@ -306,7 +298,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeClusterCmd(text: String): DescribeClusterCmd = {
     text match {
-      case DESCRIBE_CLUSTER_PATTERN() => new DescribeClusterCmd
+      case DESCRIBE_CLUSTER_PATTERN() =>
+        new DescribeClusterCmd
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE CLUSTER. " +
         s"""It should comply to the pattern: ${DESCRIBE_CLUSTER_PATTERN.toString}""")
     }
@@ -314,7 +307,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeKeyspaceCmd(text: String): DescribeKeyspaceCmd = {
     text match {
-      case DESCRIBE_KEYSPACE_PATTERN(keyspace) => new DescribeKeyspaceCmd(keyspace)
+      case DESCRIBE_KEYSPACE_PATTERN(keyspace) =>
+        DescribeKeyspaceCmd(keyspace)
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE KEYSPACE. " +
         s"""It should comply to the pattern: ${DESCRIBE_KEYSPACE_PATTERN.toString}""")
     }
@@ -322,7 +316,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeKeyspacesCmd(text: String): DescribeKeyspacesCmd = {
     text match {
-        case DESCRIBE_KEYSPACES_PATTERN() => new DescribeKeyspacesCmd
+        case DESCRIBE_KEYSPACES_PATTERN() =>
+          new DescribeKeyspacesCmd
         case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE KEYSPACES. " +
           s"""It should comply to the pattern: ${DESCRIBE_KEYSPACES_PATTERN.toString}""")
       }
@@ -330,8 +325,10 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeTableCmd(text: String): DescribeTableCmd = {
     text match {
-      case DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN(keyspace,table) => new DescribeTableCmd(Option(keyspace),table)
-      case DESCRIBE_TABLE_PATTERN(table) => new DescribeTableCmd(Option.empty,table)
+      case DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN(keyspace,table) =>
+        DescribeTableCmd(Option(keyspace),table)
+      case DESCRIBE_TABLE_PATTERN(table) =>
+        DescribeTableCmd(Option.empty,table)
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TABLE. " +
         s"""It should comply to the patterns: ${DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_TABLE_PATTERN.toString}""".stripMargin)
     }
@@ -339,7 +336,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeTablesCmd(text: String): DescribeTablesCmd = {
     text match {
-      case DESCRIBE_TABLES_PATTERN() => new DescribeTablesCmd
+      case DESCRIBE_TABLES_PATTERN() =>
+        new DescribeTablesCmd
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TABLES. " +
         s"""It should comply to the pattern: ${DESCRIBE_TABLES_PATTERN.toString}""")
     }
@@ -347,8 +345,10 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeTypeCmd(text: String): DescribeTypeCmd = {
     text match {
-      case DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN(keyspace,table) => new DescribeTypeCmd(Option(keyspace),table)
-      case DESCRIBE_TYPE_PATTERN(table) => new DescribeTypeCmd(Option.empty,table)
+      case DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN(keyspace,table) =>
+        DescribeTypeCmd(Option(keyspace),table)
+      case DESCRIBE_TYPE_PATTERN(table) =>
+        DescribeTypeCmd(Option.empty,table)
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TYPE. " +
         s"""It should comply to the patterns: ${DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_TYPE_PATTERN.toString}""".stripMargin)
     }
@@ -356,7 +356,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeTypesCmd(text: String): DescribeTypesCmd = {
     text match {
-      case DESCRIBE_TYPES_PATTERN() => new DescribeTypesCmd
+      case DESCRIBE_TYPES_PATTERN() =>
+        new DescribeTypesCmd
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TYPES. " +
         s"""It should comply to the pattern: ${DESCRIBE_TYPES_PATTERN.toString}""")
     }
@@ -364,8 +365,10 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeFunctionCmd(text: String): DescribeFunctionCmd = {
     text match {
-      case DESCRIBE_FUNCTION_WITH_KEYSPACE_PATTERN(keyspace,function) => new DescribeFunctionCmd(Option(keyspace),function)
-      case DESCRIBE_FUNCTION_PATTERN(function) => new DescribeFunctionCmd(Option.empty,function)
+      case DESCRIBE_FUNCTION_WITH_KEYSPACE_PATTERN(keyspace,function) =>
+        DescribeFunctionCmd(Option(keyspace),function)
+      case DESCRIBE_FUNCTION_PATTERN(function) =>
+        DescribeFunctionCmd(Option.empty,function)
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE FUNCTION. " +
         s"""It should comply to the patterns: ${DESCRIBE_FUNCTION_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_FUNCTION_PATTERN.toString}""".stripMargin)
     }
@@ -373,7 +376,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeFunctionsCmd(text: String): DescribeFunctionsCmd = {
     text match {
-      case DESCRIBE_FUNCTIONS_PATTERN() => new DescribeFunctionsCmd
+      case DESCRIBE_FUNCTIONS_PATTERN() =>
+        new DescribeFunctionsCmd
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE FUNCTIONS. " +
         s"""It should comply to the pattern: ${DESCRIBE_FUNCTIONS_PATTERN.toString}""".stripMargin)
     }
@@ -381,8 +385,10 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeAggregateCmd(text: String): DescribeAggregateCmd = {
     text match {
-      case DESCRIBE_AGGREGATE_WITH_KEYSPACE_PATTERN(keyspace,aggregate) => new DescribeAggregateCmd(Option(keyspace),aggregate)
-      case DESCRIBE_AGGREGATE_PATTERN(aggregate) => new DescribeAggregateCmd(Option.empty,aggregate)
+      case DESCRIBE_AGGREGATE_WITH_KEYSPACE_PATTERN(keyspace,aggregate) =>
+        DescribeAggregateCmd(Option(keyspace),aggregate)
+      case DESCRIBE_AGGREGATE_PATTERN(aggregate) =>
+        DescribeAggregateCmd(Option.empty,aggregate)
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE AGGREGATE. " +
         s"""It should comply to the patterns: ${DESCRIBE_AGGREGATE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_AGGREGATE_PATTERN.toString}""".stripMargin)
     }
@@ -390,7 +396,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeAggregatesCmd(text: String): DescribeAggregatesCmd = {
     text match {
-      case DESCRIBE_AGGREGATES_PATTERN() => new DescribeAggregatesCmd
+      case DESCRIBE_AGGREGATES_PATTERN() =>
+        new DescribeAggregatesCmd
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE AGGREGATES. " +
         s"""It should comply to the pattern: ${DESCRIBE_AGGREGATES_PATTERN.toString}""".stripMargin)
     }
@@ -398,8 +405,10 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeMaterializedViewCmd(text: String): DescribeMaterializedViewCmd = {
     text match {
-      case DESCRIBE_MATERIALIZED_VIEW_WITH_KEYSPACE_PATTERN(keyspace,view) => new DescribeMaterializedViewCmd(Option(keyspace),view)
-      case DESCRIBE_MATERIALIZED_VIEW_PATTERN(view) => new DescribeMaterializedViewCmd(Option.empty,view)
+      case DESCRIBE_MATERIALIZED_VIEW_WITH_KEYSPACE_PATTERN(keyspace,view) =>
+        DescribeMaterializedViewCmd(Option(keyspace),view)
+      case DESCRIBE_MATERIALIZED_VIEW_PATTERN(view) =>
+        DescribeMaterializedViewCmd(Option.empty,view)
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE MATERIALIZED VIEW. " +
         s"""It should comply to the patterns: ${DESCRIBE_MATERIALIZED_VIEW_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_MATERIALIZED_VIEW_PATTERN.toString}""".stripMargin)
     }
@@ -407,7 +416,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractDescribeMaterializedViewsCmd(text: String): DescribeMaterializedViewsCmd = {
     text match {
-      case DESCRIBE_MATERIALIZED_VIEWS_PATTERN() => new DescribeMaterializedViewsCmd
+      case DESCRIBE_MATERIALIZED_VIEWS_PATTERN() =>
+        new DescribeMaterializedViewsCmd
       case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE MATERIALIZED VIEWS. " +
         s"""It should comply to the pattern: ${DESCRIBE_MATERIALIZED_VIEWS_PATTERN.toString}""".stripMargin)
     }
@@ -415,7 +425,8 @@ class ParagraphParser extends RegexParsers{
 
   def extractHelpCmd(text: String): HelpCmd = {
     text match {
-      case HELP_PATTERN() => new HelpCmd
+      case HELP_PATTERN() =>
+        new HelpCmd
       case _ => throw new InterpreterException(s"Invalid syntax for HELP. " +
         s"""It should comply to the patterns: ${HELP_PATTERN.toString}""".stripMargin)
     }
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
index be55564..790d397 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
@@ -17,6 +17,8 @@
 package org.apache.zeppelin.cassandra
 
 import com.datastax.driver.core._
+import com.datastax.oss.driver.api.core.ConsistencyLevel
+import com.datastax.oss.driver.api.core.cql.BatchType
 
 /**
  * Define a Scala object hierarchy
@@ -42,7 +44,6 @@ object TextBlockHierarchy {
   object ConsistencyParam extends ParameterType
   object SerialConsistencyParam extends ParameterType
   object TimestampParam extends ParameterType
-  object RetryPolicyParam extends ParameterType
   object FetchSizeParam extends ParameterType
   object RequestTimeOutParam extends ParameterType
 
@@ -63,15 +64,6 @@ object TextBlockHierarchy {
 
   case class RequestTimeOut(value: Int) extends QueryParameters(RequestTimeOutParam)
 
-  abstract class RetryPolicy extends QueryParameters(RetryPolicyParam)
-
-  object DefaultRetryPolicy extends RetryPolicy
-  object DowngradingRetryPolicy extends RetryPolicy
-  object FallThroughRetryPolicy extends RetryPolicy
-  object LoggingDefaultRetryPolicy extends RetryPolicy
-  object LoggingDowngradingRetryPolicy extends RetryPolicy
-  object LoggingFallThroughRetryPolicy extends RetryPolicy
-  
   sealed trait StatementType
   object PrepareStatementType extends StatementType
   object RemovePrepareStatementType extends StatementType
@@ -106,7 +98,7 @@ object TextBlockHierarchy {
   
   case class BoundStm(name: String, values:String) extends QueryStatement(BoundStatementType)
   
-  case class BatchStm(batchType: BatchStatement.Type, statements: List[QueryStatement])
+  case class BatchStm(batchType: BatchType, statements: List[QueryStatement])
     extends QueryStatement(BatchStatementType)
 
   sealed trait DescribeCommandStatement {
diff --git a/cassandra/src/main/scala/scala/compat/java8/OptionConverters.scala b/cassandra/src/main/scala/scala/compat/java8/OptionConverters.scala
new file mode 100644
index 0000000..49e06c2
--- /dev/null
+++ b/cassandra/src/main/scala/scala/compat/java8/OptionConverters.scala
@@ -0,0 +1,140 @@
+/*
+ * Scala (https://www.scala-lang.org)
+ *
+ * Copyright EPFL and Lightbend, Inc.
+ *
+ * Licensed under Apache License 2.0
+ * (http://www.apache.org/licenses/LICENSE-2.0).
+ *
+ * See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.
+ */
+
+package scala.compat.java8
+
+import java.util.{Optional, OptionalDouble, OptionalInt, OptionalLong}
+
+/** This class enables bidirectional conversion between `scala.Option` and the
+  * set of `java.util.Optional` classes.
+  *
+  * The Scala `Option` is generic; its generic counterpart in Java is
+  * `java.util.Optional`.  `Option` is enriched with an `asJava` method, while
+  * `Optional` is enriched with `asScala` to perform conversions.
+  *
+  * In addition, both `Option` and `Optional` are enriched with `asPrimitive`
+  * methods that will convert generically contained primitives to the manually
+  * specialized Java versions for primitives, `OptionalDouble`, `OptionalInt`,
+  * and `OptionalLong`.  The primitive versions can be converted to the Scala
+  * generic `Option` with `asScala` and to the Java generic `Optional` with
+  * `asGeneric`.
+  *
+  * When calling from Java, methods are more convenient than extension methods,
+  * so `toJava` and `toScala` methods are provided that convert to and from
+  * Scala's `Option`.  Note that `toJava(toScala(x))` will result in a generic
+  * `Optional` even if `x` was one of the primitive versons.
+  *
+  * Example usage:
+  *
+  * {{{
+  * import scala.compat.java8.OptionConverters._
+  * val a = Option("example").asJava      // Creates java.util.Optional[String] containing "example"
+  * val b = (None: Option[String]).asJava // Creates an empty java.util.Optional[String]
+  * val c = a.asScala                     // Back to Option("example")
+  * val d = b.asScala                     // Back to None typed as Option[String]
+  * val e = Option(2.7).asJava            // java.util.Optional[Double] containing boxed 2.7
+  * val f = Option(2.7).asPrimitive       // java.util.OptionalDouble containing 2.7 (not boxed)
+  * val g = f.asScala                     // Back to Option(2.7)
+  * val h = f.asGeneric                   // Same as e
+  * val i = e.asPrimitive                 // Same as f
+  * val j = toJava(Option("example"))     // Same as a
+  * val k = toScala(a)                    // Same as c
+  * }}}
+  */
+object OptionConverters {
+  /** Type class implementing conversion from generic `Option` or `Optional` to manually specialized variants. */
+  sealed abstract class SpecializerOfOptions[A, That] {
+    /** Converts from `Optional` to a manually specialized variant `That` */
+    def fromJava(o: Optional[A]): That
+    /** Converts from `Option` to a manually specialized variant `That` */
+    def fromScala(o: Option[A]): That
+  }
+  
+  /** Implementation of creation of `OptionalDouble` from `Option[Double]` or `Optional[Double]`*/
+  implicit val specializer_OptionalDouble = new SpecializerOfOptions[Double, OptionalDouble] {
+    /** Creates an `OptionalDouble` from `Optional[Double]` */
+    def fromJava(o: Optional[Double]): OptionalDouble = if (o.isPresent) OptionalDouble.of(o.get) else OptionalDouble.empty
+    /** Creates an `OptionalDouble` from `Option[Double]` */
+    def fromScala(o: Option[Double]): OptionalDouble = o match { case Some(d) => OptionalDouble.of(d); case _ => OptionalDouble.empty }
+  }
+  
+  /** Implementation of creation of `OptionalInt` from `Option[Int]` or `Optional[Int]`*/
+  implicit val specializer_OptionalInt = new SpecializerOfOptions[Int, OptionalInt] {
+    /** Creates an `OptionalInt` from `Optional[Int]` */
+    def fromJava(o: Optional[Int]): OptionalInt = if (o.isPresent) OptionalInt.of(o.get) else OptionalInt.empty
+    /** Creates an `OptionalInt` from `Option[Int]` */
+    def fromScala(o: Option[Int]): OptionalInt = o match { case Some(d) => OptionalInt.of(d); case _ => OptionalInt.empty }
+  }
+  
+  /** Implementation of creation of `OptionalLong` from `Option[Long]` or `Optional[Long]`*/
+  implicit val specializer_OptionalLong = new SpecializerOfOptions[Long, OptionalLong] {
+    /** Creates an `OptionalLong` from `Optional[Long]` */
+    def fromJava(o: Optional[Long]): OptionalLong = if (o.isPresent) OptionalLong.of(o.get) else OptionalLong.empty
+    /** Creates an `OptionalLong` from `Option[Long]` */
+    def fromScala(o: Option[Long]): OptionalLong = o match { case Some(d) => OptionalLong.of(d); case _ => OptionalLong.empty }
+  }
+
+  /** Provides conversions from `java.util.Optional` to Scala `Option` or primitive `java.util.Optional` types */
+  implicit class RichOptionalGeneric[A](val underlying: java.util.Optional[A]) extends AnyVal {
+    /** Create a `scala.Option` version of this `Optional` */
+    def asScala: Option[A] = if (underlying.isPresent) Some(underlying.get) else None
+    /** Create a specialized primitive variant of this generic `Optional`, if an appropriate one exists */
+    def asPrimitive[That](implicit specOp: SpecializerOfOptions[A, That]): That = specOp.fromJava(underlying)
+  }
+  
+  /** Provides conversions from `scala.Option` to Java `Optional` types, either generic or primitive */
+  implicit class RichOptionForJava8[A](val underlying: Option[A]) extends AnyVal {
+    /** Create a `java.util.Optional` version of this `Option` (not specialized) */
+    def asJava: Optional[A] = underlying match { case Some(a) => Optional.ofNullable(a); case _ => Optional.empty[A] }
+    /** Create a specialized primitive `java.util.Optional` type, if an appropriate one exists */
+    def asPrimitive[That](implicit specOp: SpecializerOfOptions[A, That]): That = specOp.fromScala(underlying)
+  }
+  
+  /** Provides conversions from `java.util.OptionalDouble` to the generic `Optional` and Scala `Option` */
+  implicit class RichOptionalDouble(val underlying: OptionalDouble) extends AnyVal {
+    /** Create a `scala.Option` version of this `OptionalDouble` */
+    def asScala: Option[Double] = if (underlying.isPresent) Some(underlying.getAsDouble) else None
+    /** Create a generic `java.util.Optional` version of this `OptionalDouble` */
+    def asGeneric: Optional[Double] = if (underlying.isPresent) Optional.of(underlying.getAsDouble) else Optional.empty[Double]
+  }
+  
+  /** Provides conversions from `java.util.OptionalInt` to the generic `Optional` and Scala `Option` */
+  implicit class RichOptionalInt(val underlying: OptionalInt) extends AnyVal {
+    /** Create a `scala.Option` version of this `OptionalInt` */
+    def asScala: Option[Int] = if (underlying.isPresent) Some(underlying.getAsInt) else None
+    /** Create a generic `java.util.Optional` version of this `OptionalInt` */
+    def asGeneric: Optional[Int] = if (underlying.isPresent) Optional.of(underlying.getAsInt) else Optional.empty[Int]
+  }
+  
+  /** Provides conversions from `java.util.OptionalLong` to the generic `Optional` and Scala `Option` */
+  implicit class RichOptionalLong(val underlying: OptionalLong) extends AnyVal {
+    /** Create a `scala.Option` version of this `OptionalLong` */
+    def asScala: Option[Long] = if (underlying.isPresent) Some(underlying.getAsLong) else None
+    /** Create a generic `java.util.Optional` version of this `OptionalLong` */
+    def asGeneric: Optional[Long] = if (underlying.isPresent) Optional.of(underlying.getAsLong) else Optional.empty[Long]
+  }
+  
+  /** Conversion from Scala `Option` to Java `Optional` without using implicits, for convenient use from Java. */
+  final def toJava[A](o: Option[A]): Optional[A] = o match { case Some(a) => Optional.ofNullable(a); case _ => Optional.empty[A] }
+  
+  /** Conversion from Java `Optional` to Scala `Option` without using implicits, for convenient use from Java */
+  final def toScala[A](o: Optional[A]): Option[A] = if (o.isPresent) Some(o.get) else None
+
+  /** Conversion from Java `OptionalDouble` to Scala `Option` without using implicits, for convenient use from Java */
+  final def toScala(o: OptionalDouble): Option[Double] = if (o.isPresent) Some(o.getAsDouble) else None
+
+  /** Conversion from Java `OptionalInt` to Scala `Option` without using implicits, for convenient use from Java */
+  final def toScala(o: OptionalInt): Option[Int] = if (o.isPresent) Some(o.getAsInt) else None
+
+  /** Conversion from Java `OptionalLong` to Scala `Option` without using implicits, for convenient use from Java */
+  final def toScala(o: OptionalLong): Option[Long] = if (o.isPresent) Some(o.getAsLong) else None
+}
diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
index 0417dc9..8b40210 100644
--- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
+++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
@@ -16,12 +16,32 @@
  */
 package org.apache.zeppelin.cassandra;
 
-import static com.google.common.collect.FluentIterable.from;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.when;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.internal.core.type.codec.TimestampCodec;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.cassandraunit.CQLDataLoader;
+import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
+import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
 
-import static com.datastax.driver.core.ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Instant;
+import java.util.Properties;
 
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CLUSTER_NAME;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_COMPRESSION_PROTOCOL;
@@ -29,20 +49,12 @@ import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CREDE
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CREDENTIALS_USERNAME;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_HOSTS;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_LOAD_BALANCING_POLICY;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_PORT;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_PROTOCOL_VERSION;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_CONSISTENCY;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_FETCH_SIZE;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY;
@@ -52,80 +64,40 @@ import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKE
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKET_TCP_NO_DELAY;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SPECULATIVE_EXECUTION_POLICY;
-
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Properties;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ProtocolVersion;
-import com.datastax.driver.core.Session;
-
-import info.archinnov.achilles.embedded.CassandraEmbeddedServerBuilder;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public class CassandraInterpreterTest {
+public class CassandraInterpreterTest { //extends AbstractCassandraUnit4CQLTestCase {
   private static final String ARTISTS_TABLE = "zeppelin.artists";
+  private static final int DEFAULT_UNIT_TEST_PORT = 9142;
 
-  public static Session session = CassandraEmbeddedServerBuilder
-          .noEntityPackages()
-          .withKeyspaceName("zeppelin")
-          .withScript("prepare_schema.cql")
-          .withScript("prepare_data.cql")
-          .withProtocolVersion(ProtocolVersion.V3)
-          .buildNativeSessionOnly();
-
-  private static CassandraInterpreter interpreter;
+  private static volatile CassandraInterpreter interpreter;
 
   @Mock(answer = Answers.RETURNS_DEEP_STUBS)
   private InterpreterContext intrContext;
 
   @BeforeClass
-  public static void setUp() {
-    Properties properties = new Properties();
-    final Cluster cluster = session.getCluster();
+  public static synchronized void setUp() throws IOException, InterruptedException {
+    EmbeddedCassandraServerHelper.startEmbeddedCassandra();
+    CqlSession session = EmbeddedCassandraServerHelper.getSession();
+    new CQLDataLoader(session).load(new ClassPathCQLDataSet("prepare_all.cql", "zeppelin"));
 
-    properties.setProperty(CASSANDRA_CLUSTER_NAME, cluster.getClusterName());
+    Properties properties = new Properties();
+    properties.setProperty(CASSANDRA_CLUSTER_NAME, EmbeddedCassandraServerHelper.getClusterName());
     properties.setProperty(CASSANDRA_COMPRESSION_PROTOCOL, "NONE");
     properties.setProperty(CASSANDRA_CREDENTIALS_USERNAME, "none");
     properties.setProperty(CASSANDRA_CREDENTIALS_PASSWORD, "none");
 
-    properties.setProperty(CASSANDRA_PROTOCOL_VERSION, "3");
     properties.setProperty(CASSANDRA_LOAD_BALANCING_POLICY, "DEFAULT");
     properties.setProperty(CASSANDRA_RETRY_POLICY, "DEFAULT");
     properties.setProperty(CASSANDRA_RECONNECTION_POLICY, "DEFAULT");
     properties.setProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY, "DEFAULT");
 
-    properties.setProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
-            DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS + "");
-
-    properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL, "100");
-    properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE, "100");
-    properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL, "2");
-    properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE, "1");
-    properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL, "8");
-    properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE, "2");
-    properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL, "1024");
-    properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE, "256");
+    properties.setProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL, "2");
+    properties.setProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE, "1");
+    properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION, "1024");
 
-    properties.setProperty(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS, "120");
     properties.setProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS, "5000");
     properties.setProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS, "30");
 
@@ -137,10 +109,9 @@ public class CassandraInterpreterTest {
     properties.setProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS, "12000");
     properties.setProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, "true");
 
-    properties.setProperty(CASSANDRA_HOSTS, from(cluster.getMetadata().getAllHosts()).first()
-            .get().getAddress().getHostAddress());
-    properties.setProperty(CASSANDRA_PORT, cluster.getConfiguration().getProtocolOptions()
-            .getPort() + "");
+    properties.setProperty(CASSANDRA_HOSTS, EmbeddedCassandraServerHelper.getHost());
+    properties.setProperty(CASSANDRA_PORT,
+            Integer.toString(EmbeddedCassandraServerHelper.getNativeTransportPort()));
     interpreter = new CassandraInterpreter(properties);
     interpreter.open();
   }
@@ -157,9 +128,6 @@ public class CassandraInterpreterTest {
 
   @Test
   public void should_create_cluster_and_session_upon_call_to_open() throws Exception {
-    assertThat(interpreter.cluster).isNotNull();
-    assertThat(interpreter.cluster.getClusterName()).isEqualTo(session.getCluster()
-            .getClusterName());
     assertThat(interpreter.session).isNotNull();
     assertThat(interpreter.helper).isNotNull();
   }
@@ -176,19 +144,21 @@ public class CassandraInterpreterTest {
     assertThat(actual).isNotNull();
     assertThat(actual.code()).isEqualTo(Code.SUCCESS);
     assertThat(actual.message().get(0).getData()).isEqualTo("name\tborn\tcountry\tdied\tgender\t" +
-            "styles\ttype\n" +
-            "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
-            "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n" +
-            "Sheryl Crow\t1962-02-11\tUSA\tnull\tFemale\t" +
-            "[Classic, Rock, Country, Blues, Pop, Folk]\tPerson\n" +
-            "Doof\t1968-08-31\tUnited Kingdom\tnull\tnull\t[Unknown]\tPerson\n" +
-            "House of Large Sizes\t1986-01-01\tUSA\t2003\tnull\t[Unknown]\tGroup\n" +
-            "Fanfarlo\t2006-01-01\tUnited Kingdom\tnull\tnull\t" +
-            "[Rock, Indie, Pop, Classic]\tGroup\n" +
-            "Jeff Beck\t1944-06-24\tUnited Kingdom\tnull\tMale\t[Rock, Pop, Classic]\tPerson\n" +
-            "Los Paranoias\tnull\tUnknown\tnull\tnull\t[Unknown]\tnull\n" +
-            "…And You Will Know Us by the Trail of Dead\t1994-01-01\tUSA\tnull\tnull\t" +
-            "[Rock, Pop, Classic]\tGroup\n");
+        "styles\ttype\n" +
+        "'Bogdan Raczynski'\t'1977-01-01'\t'Poland'\tnull\t'Male'\t" +
+        "['Dance','Electro']\t'Person'\n" +
+        "'Krishna Das'\t'1947-05-31'\t'USA'\tnull\t'Male'\t['Unknown']\t'Person'\n" +
+        "'Sheryl Crow'\t'1962-02-11'\t'USA'\tnull\t'Female'\t" +
+        "['Classic','Rock','Country','Blues','Pop','Folk']\t'Person'\n" +
+        "'Doof'\t'1968-08-31'\t'United Kingdom'\tnull\tnull\t['Unknown']\t'Person'\n" +
+        "'House of Large Sizes'\t'1986-01-01'\t'USA'\t'2003'\tnull\t['Unknown']\t'Group'\n" +
+        "'Fanfarlo'\t'2006-01-01'\t'United Kingdom'\tnull\tnull\t" +
+        "['Rock','Indie','Pop','Classic']\t'Group'\n" +
+        "'Jeff Beck'\t'1944-06-24'\t'United Kingdom'\tnull\t'Male'\t" +
+        "['Rock','Pop','Classic']\t'Person'\n" +
+        "'Los Paranoias'\tnull\t'Unknown'\tnull\tnull\t['Unknown']\tnull\n" +
+        "'…And You Will Know Us by the Trail of Dead'\t'1994-01-01'\t'USA'\tnull\tnull\t" +
+        "['Rock','Pop','Classic']\t'Group'\n");
   }
 
   @Test
@@ -203,9 +173,10 @@ public class CassandraInterpreterTest {
     assertThat(actual).isNotNull();
     assertThat(actual.code()).isEqualTo(Code.SUCCESS);
     assertThat(actual.message().get(0).getData())
-            .isEqualTo("name\tborn\tcountry\tdied\tgender\tstyles\ttype\n" +
-            "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
-            "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n");
+        .isEqualTo("name\tborn\tcountry\tdied\tgender\tstyles\ttype\n" +
+        "'Bogdan Raczynski'\t'1977-01-01'\t'Poland'\tnull\t'Male'\t" +
+        "['Dance','Electro']\t'Person'\n" +
+        "'Krishna Das'\t'1947-05-31'\t'USA'\tnull\t'Male'\t['Unknown']\t'Person'\n");
   }
 
   @Test
@@ -231,9 +202,9 @@ public class CassandraInterpreterTest {
     //Then
     assertThat(actual.code()).isEqualTo(Code.SUCCESS);
     assertThat(actual.message().get(0).getData()).isEqualTo("title\tartist\tyear\n" +
-            "The Impossible Dream EP\tCarter the Unstoppable Sex Machine\t1992\n" +
-            "The Way You Are\tTears for Fears\t1983\n" +
-            "Primitive\tSoulfly\t2003\n");
+            "'The Impossible Dream EP'\t'Carter the Unstoppable Sex Machine'\t1992\n" +
+            "'The Way You Are'\t'Tears for Fears'\t1983\n" +
+            "'Primitive'\t'Soulfly'\t2003\n");
   }
     
   @Test
@@ -262,8 +233,9 @@ public class CassandraInterpreterTest {
 
     //Then
     assertThat(actual.code()).isEqualTo(Code.ERROR);
+    String s = "line 1:9 mismatched input 'zeppelin' expecting K_FROM (SELECT * [zeppelin]...)";
     assertThat(actual.message().get(0).getData())
-            .contains("line 1:9 missing K_FROM at 'zeppelin' (SELECT * [zeppelin]....)");
+            .contains(s);
   }
 
   @Test
@@ -302,15 +274,22 @@ public class CassandraInterpreterTest {
     String statement2 = "@timestamp=15\n" +
             "INSERT INTO zeppelin.ts(key,val) VALUES('k','v2');";
 
+    CqlSession session = EmbeddedCassandraServerHelper.getSession();
     // Insert v1 with current timestamp
     interpreter.interpret(statement1, intrContext);
+    System.out.println("going to read data from zeppelin.ts;");
+    session.execute("SELECT val FROM zeppelin.ts LIMIT 1")
+            .forEach(x -> System.out.println("row " + x ));
 
     Thread.sleep(1);
 
     //When
     // Insert v2 with past timestamp
     interpreter.interpret(statement2, intrContext);
-    final String actual = session.execute("SELECT * FROM zeppelin.ts LIMIT 1").one()
+    System.out.println("going to read data from zeppelin.ts;");
+    session.execute("SELECT val FROM zeppelin.ts LIMIT 1")
+            .forEach(x -> System.out.println("row " + x ));
+    final String actual = session.execute("SELECT val FROM zeppelin.ts LIMIT 1").one()
             .getString("val");
 
     //Then
@@ -318,20 +297,6 @@ public class CassandraInterpreterTest {
   }
 
   @Test
-  public void should_execute_statement_with_retry_policy() throws Exception {
-    //Given
-    String statement = "@retryPolicy=" + interpreter.LOGGING_DOWNGRADING_RETRY + "\n" +
-            "@consistency=THREE\n" +
-            "SELECT * FROM zeppelin.artists LIMIT 1;";
-
-    //When
-    final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
-    //Then
-    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-  }
-
-  @Test
   public void should_execute_statement_with_request_timeout() throws Exception {
     //Given
     String statement = "@requestTimeOut=10000000\n" +
@@ -358,7 +323,7 @@ public class CassandraInterpreterTest {
     //Then
     assertThat(actual.code()).isEqualTo(Code.SUCCESS);
     assertThat(actual.message().get(0).getData()).isEqualTo("key\tval\n" +
-            "myKey\tmyValue\n");
+            "'myKey'\t'myValue'\n");
   }
 
   @Test
@@ -380,14 +345,14 @@ public class CassandraInterpreterTest {
     assertThat(actual.code()).isEqualTo(Code.SUCCESS);
     assertThat(actual.message().get(0).getData()).isEqualTo(
             "login\taddresses\tage\tdeceased\tfirstname\tlast_update\tlastname\tlocation\n" +
-                    "jdoe\t" +
+                    "'jdoe'\t" +
                     "{street_number:3,street_name:'Beverly Hills Bld',zip_code:90209," +
                     "country:'USA',extra_info:['Right on the hills','Next to the post box']," +
-                    "phone_numbers:{'office':2015790847,'home':2016778524}}\tnull\t" +
+                    "phone_numbers:{'home':2016778524,'office':2015790847}}\tnull\t" +
                     "null\t" +
-                    "John\t" +
+                    "'John'\t" +
                     "null\t" +
-                    "DOE\t" +
+                    "'DOE'\t" +
                     "('USA',90209,'Beverly Hills')\n");
   }
 
@@ -424,7 +389,7 @@ public class CassandraInterpreterTest {
     //Then
     assertThat(actual.code()).isEqualTo(Code.SUCCESS);
     assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\tage\n" +
-            "Helen\tSUE\t27\n");
+            "'Helen'\t'SUE'\t27\n");
   }
 
   @Test
@@ -456,9 +421,9 @@ public class CassandraInterpreterTest {
     //Then
     assertThat(actual.code()).isEqualTo(Code.SUCCESS);
     assertThat(actual.message().get(0).getData()).isEqualTo("name\tcountry\tstyles\n" +
-            "Bogdan Raczynski\tPoland\t[Dance, Electro]\n" +
-            "Krishna Das\tUSA\t[Unknown]\n" +
-            "Sheryl Crow\tUSA\t[Classic, Rock, Country, Blues, Pop, Folk]\n");
+            "'Bogdan Raczynski'\t'Poland'\t['Dance','Electro']\n" +
+            "'Krishna Das'\t'USA'\t['Unknown']\n" +
+            "'Sheryl Crow'\t'USA'\t['Classic','Rock','Country','Blues','Pop','Folk']\n");
   }
 
   @Test
@@ -473,8 +438,9 @@ public class CassandraInterpreterTest {
 
     //Then
     assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    Instant tm = Instant.parse("2015-07-30T12:00:01Z");
     assertThat(actual.message().get(0).getData()).contains("last_update\n" +
-            "Thu Jul 30 12:00:01");
+            new TimestampCodec().format(tm));
   }
 
   @Test
@@ -490,7 +456,7 @@ public class CassandraInterpreterTest {
     //Then
     assertThat(actual.code()).isEqualTo(Code.SUCCESS);
     assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\n" +
-            "null\tNULL\n");
+            "null\t'NULL'\n");
   }
 
   @Test
@@ -506,7 +472,7 @@ public class CassandraInterpreterTest {
     //Then
     assertThat(actual.code()).isEqualTo(Code.SUCCESS);
     assertThat(actual.message().get(0).getData()).isEqualTo("login\tdeceased\n" +
-            "bind_bool\tfalse\n");
+            "'bind_bool'\tfalse\n");
   }
 
   @Test
@@ -537,11 +503,8 @@ public class CassandraInterpreterTest {
 
     //When
     final InterpreterResult actual = interpreter.interpret(query, intrContext);
-    final Cluster cluster = session.getCluster();
-    final int port = cluster.getConfiguration().getProtocolOptions().getPort();
-    final String address = cluster.getMetadata().getAllHosts().iterator().next()
-            .getAddress().getHostAddress()
-            .replaceAll("/", "").replaceAll("\\[", "").replaceAll("\\]", "");
+    final int port = EmbeddedCassandraServerHelper.getNativeTransportPort();
+    final String address = EmbeddedCassandraServerHelper.getHost();
     //Then
     final String expected = rawResult.replaceAll("TRIED_HOSTS", address + ":" + port)
             .replaceAll("QUERIED_HOSTS", address + ":" + port);
@@ -560,7 +523,8 @@ public class CassandraInterpreterTest {
 
     //Then
     assertThat(actual.code()).isEqualTo(Code.ERROR);
-    assertThat(actual.message().get(0).getData()).contains("All host(s) tried for query failed");
+    assertThat(actual.message().get(0).getData())
+            .contains("All 1 node(s) tried for the query failed");
   }
 
   @Test
@@ -725,6 +689,37 @@ public class CassandraInterpreterTest {
   }
 
   @Test
+  public void should_describe_all_tables() throws Exception {
+    //Given
+    String query = "DESCRIBE TABLES;";
+    final String expected = reformatHtml(readTestResource(
+            "/scalate/DescribeTables.html"));
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+  }
+
+  @Test
+  public void should_describe_all_udts() throws Exception {
+    //Given
+    String query = "DESCRIBE TYPES;";
+    final String expected = reformatHtml(readTestResource(
+            "/scalate/DescribeTypes.html"));
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+  }
+
+
+  @Test
   public void should_error_describing_non_existing_table() throws Exception {
     //Given
     String query = "USE system;\n" +
diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
index 9d4c9ee..0ae802e 100644
--- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
+++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.zeppelin.cassandra;
 
+import static com.datastax.oss.driver.api.core.ConsistencyLevel.ALL;
+import static com.datastax.oss.driver.api.core.ConsistencyLevel.LOCAL_SERIAL;
+import static com.datastax.oss.driver.api.core.ConsistencyLevel.ONE;
+import static com.datastax.oss.driver.api.core.ConsistencyLevel.QUORUM;
+import static com.datastax.oss.driver.api.core.ConsistencyLevel.SERIAL;
+import static com.datastax.oss.driver.api.core.cql.BatchType.UNLOGGED;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -26,12 +32,11 @@ import static org.mockito.Mockito.when;
 
 import static java.util.Arrays.asList;
 
-import static com.datastax.driver.core.BatchStatement.Type.UNLOGGED;
-import static com.datastax.driver.core.ConsistencyLevel.ALL;
-import static com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL;
-import static com.datastax.driver.core.ConsistencyLevel.ONE;
-import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
-import static com.datastax.driver.core.ConsistencyLevel.SERIAL;
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.BatchStatement;
+import com.datastax.oss.driver.api.core.cql.BatchableStatement;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -43,27 +48,20 @@ import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoField;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
 import java.util.List;
 
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.Statement;
-
 import scala.Option;
 
 import org.apache.zeppelin.cassandra.TextBlockHierarchy.AnyBlock;
 import org.apache.zeppelin.cassandra.TextBlockHierarchy.Consistency;
-import org.apache.zeppelin.cassandra.TextBlockHierarchy.DowngradingRetryPolicy$;
-import org.apache.zeppelin.cassandra.TextBlockHierarchy.LoggingDefaultRetryPolicy$;
 import org.apache.zeppelin.cassandra.TextBlockHierarchy.QueryParameters;
 import org.apache.zeppelin.cassandra.TextBlockHierarchy.RequestTimeOut;
-import org.apache.zeppelin.cassandra.TextBlockHierarchy.RetryPolicy;
 import org.apache.zeppelin.cassandra.TextBlockHierarchy.SerialConsistency;
 import org.apache.zeppelin.cassandra.TextBlockHierarchy.SimpleStm;
 import org.apache.zeppelin.cassandra.TextBlockHierarchy.Timestamp;
@@ -82,7 +80,7 @@ public class InterpreterLogicTest {
   private InterpreterContext intrContext;
 
   @Mock
-  private Session session;
+  private CqlSession session;
 
   final InterpreterLogic helper = new InterpreterLogic(session);
 
@@ -268,19 +266,6 @@ public class InterpreterLogicTest {
   }
 
   @Test
-  public void should_extract_retry_policy_option() throws Exception {
-    //Given
-    List<QueryParameters> options = Arrays.<QueryParameters>asList(DowngradingRetryPolicy$.MODULE$,
-            LoggingDefaultRetryPolicy$.MODULE$);
-
-    //When
-    final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
-    //Then
-    assertThat(actual.retryPolicy().get()).isSameAs(DowngradingRetryPolicy$.MODULE$);
-  }
-
-  @Test
   public void should_extract_request_timeout_option() throws Exception {
     //Given
     List<QueryParameters> options = Arrays.<QueryParameters>asList(new RequestTimeOut(100));
@@ -299,7 +284,6 @@ public class InterpreterLogicTest {
     CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
             Option.<ConsistencyLevel>empty(),
             Option.empty(),
-            Option.<RetryPolicy>empty(),
             Option.empty(),
             Option.empty());
 
@@ -309,20 +293,20 @@ public class InterpreterLogicTest {
 
     //Then
     assertThat(actual).isNotNull();
-    assertThat(actual.getQueryString()).isEqualTo("SELECT * FROM users LIMIT 10;");
+    assertThat(actual.getQuery()).isEqualTo("SELECT * FROM users LIMIT 10;");
     assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM);
   }
 
   @Test
   public void should_generate_batch_statement() throws Exception {
     //Given
-    Statement st1 = new SimpleStatement("SELECT * FROM users LIMIT 10;");
-    Statement st2 = new SimpleStatement("INSERT INTO users(id) VALUES(10);");
-    Statement st3 = new SimpleStatement("UPDATE users SET name = 'John DOE' WHERE id=10;");
+    SimpleStatement st1 = SimpleStatement.newInstance("SELECT * FROM users LIMIT 10;");
+    SimpleStatement st2 = SimpleStatement.newInstance("INSERT INTO users(id) VALUES(10);");
+    SimpleStatement st3 = SimpleStatement.newInstance(
+            "UPDATE users SET name = 'John DOE' WHERE id=10;");
     CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
             Option.<ConsistencyLevel>empty(),
             Option.empty(),
-            Option.<RetryPolicy>empty(),
             Option.empty(),
             Option.empty());
 
@@ -332,7 +316,10 @@ public class InterpreterLogicTest {
 
     //Then
     assertThat(actual).isNotNull();
-    final List<Statement> statements = new ArrayList<>(actual.getStatements());
+    List<BatchableStatement> statements = new ArrayList<BatchableStatement>();
+    for (BatchableStatement b: actual) {
+      statements.add(b);
+    }
     assertThat(statements).hasSize(3);
     assertThat(statements.get(0)).isSameAs(st1);
     assertThat(statements.get(1)).isSameAs(st2);
@@ -359,18 +346,17 @@ public class InterpreterLogicTest {
     String dateString = "2015-07-30 12:00:01";
 
     //When
-    final Date actual = helper.parseDate(dateString);
+    final Instant actual = helper.parseDate(dateString);
 
     //Then
-    Calendar calendar = Calendar.getInstance();
-    calendar.setTime(actual);
-
-    assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
-    assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
-    assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
-    assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
-    assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
-    assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
+    ZonedDateTime dt = actual.atZone(ZoneOffset.UTC);
+
+    assertThat(dt.getLong(ChronoField.YEAR_OF_ERA)).isEqualTo(2015);
+    assertThat(dt.getLong(ChronoField.MONTH_OF_YEAR)).isEqualTo(7);
+    assertThat(dt.getLong(ChronoField.DAY_OF_MONTH)).isEqualTo(30);
+    assertThat(dt.getLong(ChronoField.HOUR_OF_DAY)).isEqualTo(12);
+    assertThat(dt.getLong(ChronoField.MINUTE_OF_HOUR)).isEqualTo(0);
+    assertThat(dt.getLong(ChronoField.SECOND_OF_MINUTE)).isEqualTo(1);
   }
 
   @Test
@@ -379,19 +365,18 @@ public class InterpreterLogicTest {
     String dateString = "2015-07-30 12:00:01.123";
 
     //When
-    final Date actual = helper.parseDate(dateString);
+    final Instant actual = helper.parseDate(dateString);
 
     //Then
-    Calendar calendar = Calendar.getInstance();
-    calendar.setTime(actual);
-
-    assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
-    assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
-    assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
-    assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
-    assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
-    assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
-    assertThat(calendar.get(Calendar.MILLISECOND)).isEqualTo(123);
+    ZonedDateTime dt = actual.atZone(ZoneOffset.UTC);
+
+    assertThat(dt.getLong(ChronoField.YEAR_OF_ERA)).isEqualTo(2015);
+    assertThat(dt.getLong(ChronoField.MONTH_OF_YEAR)).isEqualTo(7);
+    assertThat(dt.getLong(ChronoField.DAY_OF_MONTH)).isEqualTo(30);
+    assertThat(dt.getLong(ChronoField.HOUR_OF_DAY)).isEqualTo(12);
+    assertThat(dt.getLong(ChronoField.MINUTE_OF_HOUR)).isEqualTo(0);
+    assertThat(dt.getLong(ChronoField.SECOND_OF_MINUTE)).isEqualTo(1);
+    assertThat(dt.getLong(ChronoField.MILLI_OF_SECOND)).isEqualTo(123);
   }
 
   private <A> scala.collection.immutable.List<A> toScalaList(java.util.List<A> list)  {
diff --git a/cassandra/src/test/resources/application.conf b/cassandra/src/test/resources/application.conf
new file mode 100644
index 0000000..961261a
--- /dev/null
+++ b/cassandra/src/test/resources/application.conf
@@ -0,0 +1,3 @@
+datastax-java-driver {
+  advanced.request.warn-if-set-keyspace = false
+}
diff --git a/cassandra/src/test/resources/prepare_schema.cql b/cassandra/src/test/resources/prepare_all.cql
similarity index 63%
rename from cassandra/src/test/resources/prepare_schema.cql
rename to cassandra/src/test/resources/prepare_all.cql
index bdf9773..ec40293 100644
--- a/cassandra/src/test/resources/prepare_schema.cql
+++ b/cassandra/src/test/resources/prepare_all.cql
@@ -111,4 +111,23 @@ CREATE TABLE IF NOT EXISTS live_data.stations (
     station_id uuid,
     sensors frozen<map<uuid,geolocation>>,
     PRIMARY KEY (station_id)
-);
\ No newline at end of file
+);
+
+-- insert data
+TRUNCATE zeppelin.artists;
+
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Bogdan Raczynski','1977-01-01','Poland',null,'Male',['Dance', 'Electro'],'Person');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Krishna Das','1947-05-31','USA',null,'Male',['Unknown'],'Person');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Sheryl Crow','1962-02-11','USA',null,'Female',['Classic', 'Rock', 'Country', 'Blues', 'Pop', 'Folk'],'Person');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Doof','1968-08-31','United Kingdom',null,null,['Unknown'],'Person');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('House of Large Sizes','1986-01-01','USA','2003',null,['Unknown'],'Group');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Fanfarlo','2006-01-01','United Kingdom',null,null,['Rock', 'Indie', 'Pop', 'Classic'],'Group');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Jeff Beck','1944-06-24','United Kingdom',null,'Male',['Rock', 'Pop', 'Classic'],'Person');
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Los Paranoias',null,'Unknown',null,null,['Unknown'],null);
+INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('…And You Will Know Us by the Trail of Dead','1994-01-01','USA',null,null,['Rock', 'Pop', 'Classic'],'Group');
+
+TRUNCATE zeppelin.ts;
+
+TRUNCATE zeppelin.prepared;
+
+TRUNCATE zeppelin.users;
diff --git a/cassandra/src/test/resources/prepare_data.cql b/cassandra/src/test/resources/prepare_data.cql
deleted file mode 100644
index 590a97c..0000000
--- a/cassandra/src/test/resources/prepare_data.cql
+++ /dev/null
@@ -1,18 +0,0 @@
-
-TRUNCATE zeppelin.artists;
-
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Bogdan Raczynski','1977-01-01','Poland',null,'Male',['Dance', 'Electro'],'Person');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Krishna Das','1947-05-31','USA',null,'Male',['Unknown'],'Person');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Sheryl Crow','1962-02-11','USA',null,'Female',['Classic', 'Rock', 'Country', 'Blues', 'Pop', 'Folk'],'Person');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Doof','1968-08-31','United Kingdom',null,null,['Unknown'],'Person');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('House of Large Sizes','1986-01-01','USA','2003',null,['Unknown'],'Group');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Fanfarlo','2006-01-01','United Kingdom',null,null,['Rock', 'Indie', 'Pop', 'Classic'],'Group');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Jeff Beck','1944-06-24','United Kingdom',null,'Male',['Rock', 'Pop', 'Classic'],'Person');
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('Los Paranoias',null,'Unknown',null,null,['Unknown'],null);
-INSERT INTO zeppelin.artists(name,born,country,died,gender,styles,type) VALUES('…And You Will Know Us by the Trail of Dead','1994-01-01','USA',null,null,['Rock', 'Pop', 'Classic'],'Group');
-
-TRUNCATE zeppelin.ts;
-
-TRUNCATE zeppelin.prepared;
-
-TRUNCATE zeppelin.users;
diff --git a/cassandra/src/test/resources/scalate/DescribeKeyspace_live_data.html b/cassandra/src/test/resources/scalate/DescribeKeyspace_live_data.html
index 344780a..22086f5 100644
--- a/cassandra/src/test/resources/scalate/DescribeKeyspace_live_data.html
+++ b/cassandra/src/test/resources/scalate/DescribeKeyspace_live_data.html
@@ -1 +1 @@
-<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;<strong>live_data</strong></span><span class="text-danger caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-primary">Tables</span></li><li><a role="button [...]
\ No newline at end of file
+<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;<strong>live_data</strong></span><span class="text-danger caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-primary">Tables</span></li><li><a role="button [...]
diff --git a/cassandra/src/test/resources/scalate/DescribeKeyspaces.html b/cassandra/src/test/resources/scalate/DescribeKeyspaces.html
index b5c364e..76a8ed9 100644
--- a/cassandra/src/test/resources/scalate/DescribeKeyspaces.html
+++ b/cassandra/src/test/resources/scalate/DescribeKeyspaces.html
@@ -1 +1 @@
-<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-muted"><i class="glyphicon glyphicon-dashboard"/>&nbsp;<strong>Test Cluster</strong></span><span class="text-muted caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-danger">Keyspaces</span></li><li><a role="button" dat [...]
\ No newline at end of file
+<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-muted"><i class="glyphicon glyphicon-dashboard"/>&nbsp;<strong>Test Cluster</strong></span><span class="text-muted caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-danger">Keyspaces</span></li><li><a role="button" dat [...]
\ No newline at end of file
diff --git a/cassandra/src/test/resources/scalate/DescribeTable_live_data_complex_table.html b/cassandra/src/test/resources/scalate/DescribeTable_live_data_complex_table.html
index fd3ee67..e6be8b4 100644
--- a/cassandra/src/test/resources/scalate/DescribeTable_live_data_complex_table.html
+++ b/cassandra/src/test/resources/scalate/DescribeTable_live_data_complex_table.html
@@ -1 +1 @@
-<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li><a><strong>DESCRIBE TABLE live_data.complex_table;</strong></a></li></ul><ul class="nav navbar-nav navbar-right"><li class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><strong>Legend</strong><span class="caret"></span></a><ul class="dropdown-menu"><li><a role="button"><i class="glyphicon glyphicon-dashboard text-muted" />&nbsp;Cluster</a></l [...]
\ No newline at end of file
+<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li><a><strong>DESCRIBE TABLE live_data.complex_table;</strong></a></li></ul><ul class="nav navbar-nav navbar-right"><li class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><strong>Legend</strong><span class="caret"></span></a><ul class="dropdown-menu"><li><a role="button"><i class="glyphicon glyphicon-dashboard text-muted" />&nbsp;Cluster</a></l [...]
diff --git a/cassandra/src/test/resources/scalate/DescribeTables.html b/cassandra/src/test/resources/scalate/DescribeTables.html
index cf69403..b6bfcba 100644
--- a/cassandra/src/test/resources/scalate/DescribeTables.html
+++ b/cassandra/src/test/resources/scalate/DescribeTables.html
@@ -1,313 +1,477 @@
 <br/>
 <br/>
 <nav class="navbar navbar-default">
-    <ul class="nav navbar-nav">
-
-        <li role="presentation" class="dropdown">
-            <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
-                <span class="text-muted"><i class="glyphicon glyphicon-dashboard"/>&nbsp;<strong>Test Cluster</strong></span>
-                <span class="text-muted caret"></span>
-                <ul class="dropdown-menu">
-                    <li class="dropdown-header"><span class="text-danger">Keyspaces</span></li>
-                    <li>
-                        <a role="button" data-toggle="collapse" data-target="#d0b77780-3518-11e5-86fc-8f0ea8ae1a37">
-                            <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;live_data</span>
-                        </a>
-                    </li>
-                    <li>
-                        <a role="button" data-toggle="collapse" data-target="#d0b7ecb0-3518-11e5-86fc-8f0ea8ae1a37">
-                            <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;samples</span>
-                        </a>
-                    </li>
-                    <li>
-                        <a role="button" data-toggle="collapse" data-target="#d0b7ecb1-3518-11e5-86fc-8f0ea8ae1a37">
-                            <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system</span>
-                        </a>
-                    </li>
-                    <li>
-                        <a role="button" data-toggle="collapse" data-target="#d0b813c0-3518-11e5-86fc-8f0ea8ae1a37">
-                            <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system_traces</span>
-                        </a>
-                    </li>
-                    <li>
-                        <a role="button" data-toggle="collapse" data-target="#d0b813c1-3518-11e5-86fc-8f0ea8ae1a37">
-                            <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;zeppelin</span>
-                        </a>
-                    </li>
-                </ul>
+  <ul class="nav navbar-nav">
+    <li role="presentation" class="dropdown">
+      <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
+        <span class="text-muted">
+          <i class="glyphicon glyphicon-dashboard"/>&nbsp;<strong>Test Cluster</strong>
+        </span>
+        <span class="text-muted caret">
+        </span>
+        <ul class="dropdown-menu">
+          <li class="dropdown-header">
+            <span class="text-danger">Keyspaces</span>
+          </li>
+          <li>
+            <a role="button" data-toggle="collapse" >
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;live_data</span>
             </a>
+          </li>
+          <li>
+            <a role="button" data-toggle="collapse" >
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system</span>
+            </a>
+          </li>
+          <li>
+            <a role="button" data-toggle="collapse" >
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system_auth</span>
+            </a>
+          </li>
+          <li>
+            <a role="button" data-toggle="collapse" >
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system_distributed</span>
+            </a>
+          </li>
+          <li>
+            <a role="button" data-toggle="collapse" >
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system_schema</span>
+            </a>
+          </li>
+          <li>
+            <a role="button" data-toggle="collapse" >
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system_traces</span>
+            </a>
+          </li>
+          <li>
+            <a role="button" data-toggle="collapse" >
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;zeppelin</span>
+            </a>
+          </li>
+        </ul>
+      </a>
+    </li>
+    <li>
+      <a>
+        <strong>DESCRIBE TABLES;</strong>
+      </a>
+    </li>
+  </ul>
+  <ul class="nav navbar-nav navbar-right">
+    <li class="dropdown">
+      <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
+        <strong>Legend</strong>
+        <span class="caret">
+        </span>
+      </a>
+      <ul class="dropdown-menu">
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-dashboard text-muted" />&nbsp;Cluster</a>
         </li>
-
         <li>
-            <a><strong>DESCRIBE TABLES;</strong></a>
+          <a role="button">
+            <i class="glyphicon glyphicon-folder-open text-danger" />&nbsp;&nbsp;Keyspace</a>
         </li>
-    </ul>
-    <ul class="nav navbar-nav navbar-right">
-        <li class="dropdown">
-            <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
-                <strong>Legend</strong>
-                <span class="caret"></span>
-            </a>
-            <ul class="dropdown-menu">
-                <li>
-                    <a role="button">
-                        <i class="glyphicon glyphicon-dashboard text-muted" />&nbsp;Cluster
-                    </a>
-                </li>
-                <li>
-                    <a role="button">
-                        <i class="glyphicon glyphicon-folder-open text-danger" />&nbsp;&nbsp;Keyspace
-                    </a>
-                </li>
-                <li>
-                    <a role="button">
-                        <i class="glyphicon glyphicon-copyright-mark text-warning" />&nbsp;&nbsp;UDT
-                    </a>
-                </li>
-                <li>
-                    <a role="button">
-                        <i class="glyphicon glyphicon-th-list text-primary" />&nbsp;&nbsp;Table
-                    </a>
-                </li>
-                <li class="bg-info">
-                    <a role="button">
-                        <i class="glyphicon glyphicon-fullscreen" />&nbsp;&nbsp;Partition Key
-                    </a>
-                </li>
-                <li class="bg-warning">
-                    <a role="button">
-                        <i class="glyphicon glyphicon-pushpin" />&nbsp;&nbsp;Static Column
-                    </a>
-                </li>
-                <li class="bg-success">
-                    <a role="button">
-                        <i class="glyphicon glyphicon-sort" />&nbsp;&nbsp;Clustering Column
-                    </a>
-                </li>
-                <li class="bg-success">
-                    <a role="button">
-                        <i class="glyphicon glyphicon-sort-by-attributes" />&nbsp;&nbsp;Clustering Order ASC
-                    </a>
-                </li>
-                <li class="bg-success">
-                    <a role="button">
-                        <i class="glyphicon glyphicon-sort-by-attributes-alt" />&nbsp;&nbsp;Clustering Order DESC
-                    </a>
-                </li>
-                <li>
-                    <a role="button">
-                        <i class="glyphicon glyphicon-info-sign" />&nbsp;&nbsp;Indexed Column
-                    </a>
-                </li>
-            </ul>
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-copyright-mark text-warning" />&nbsp;&nbsp;UDT</a>
+        </li>
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-th-list text-primary" />&nbsp;&nbsp;Table</a>
         </li>
         <li>
-            <a href="#"></a>
+          <a role="button">
+            <i class="glyphicon glyphicon-eye-open text-primary" />&nbsp;&nbsp;Materialized View</a>
+        </li>
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-random text-success" />&nbsp;&nbsp;Function</a>
+        </li>
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-retweet text-success" />&nbsp;&nbsp;Aggregate</a>
+        </li>
+        <li role="separator" class="divider text-muted">
+        </li>
+        <li class="dropdown-header">
+          <span class="text-primary">Table icons</span>
+        </li>
+        <li class="bg-info">
+          <a role="button">
+            <i class="glyphicon glyphicon-fullscreen" />&nbsp;&nbsp;Partition Key</a>
+        </li>
+        <li class="bg-warning">
+          <a role="button">
+            <i class="glyphicon glyphicon-pushpin" />&nbsp;&nbsp;Static Column</a>
+        </li>
+        <li class="bg-success">
+          <a role="button">
+            <i class="glyphicon glyphicon-sort" />&nbsp;&nbsp;Clustering Column</a>
+        </li>
+        <li class="bg-success">
+          <a role="button">
+            <i class="glyphicon glyphicon-sort-by-attributes" />&nbsp;&nbsp;Clustering Order ASC</a>
+        </li>
+        <li class="bg-success">
+          <a role="button">
+            <i class="glyphicon glyphicon-sort-by-attributes-alt" />&nbsp;&nbsp;Clustering Order DESC</a>
         </li>
-    </ul>
+      </ul>
+    </li>
+    <li>
+      <a href="#">
+      </a>
+    </li>
+  </ul>
 </nav>
 <hr/>
-
 <div class="container">
-
-    <div class="row">
-        <div class="panel-group" role="tablist" aria-multiselectable="true">
-
-            <div class="panel panel-default">
-                <div class="panel-heading" role="tab">
-                    <h4 class="panel-title">
-                        <a role="button" data-toggle="collapse" data-target="#d0b77780-3518-11e5-86fc-8f0ea8ae1a37" aria-expanded="false">
-                            <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;live_data</span>
-                        </a>
-                    </h4>
-                </div>
-                <div id="d0b77780-3518-11e5-86fc-8f0ea8ae1a37" class="panel-collapse collapse" role="tabpanel">
-                    <div class="panel-body">
-                        <div class="row">
-                            <div class="col-md-2"/>
-                            <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
-
-
-                                <table class="table">
-                                    <thead>
-                                    <tr><th>Tables</th></tr>
-                                    </thead>
-                                    <tbody>
-
-                                    <tr class="text-primary"><td>complex_table</td></tr>
-
-                                    <tr class="text-primary"><td>sensor_data</td></tr>
-
-                                    <tr class="text-primary"><td>stations</td></tr>
-
-                                    </tbody>
-                                </table>
-                            </div>
-                            <div class="col-md-2"/>
-                        </div>
-                    </div>
-                </div>
+  <div class="row">
+    <div class="panel-group" role="tablist" aria-multiselectable="true">
+      <div class="panel panel-default">
+        <div class="panel-heading" role="tab">
+          <h4 class="panel-title">
+            <a role="button" data-toggle="collapse"  aria-expanded="false">
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;live_data</span>
+            </a>
+          </h4>
+        </div>
+        <div  class="panel-collapse collapse" role="tabpanel">
+          <div class="panel-body">
+            <div class="row">
+              <div class="col-md-2"/>
+              <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+                <table class="table">
+                  <thead>
+                    <tr>
+                      <th>Tables</th>
+                    </tr>
+                  </thead>
+                  <tbody>
+                    <tr class="text-primary">
+                      <td>complex_table</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>sensor_data</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>stations</td>
+                    </tr>
+                  </tbody>
+                </table>
+              </div>
+              <div class="col-md-2"/>
+            </div>
+          </div>
+        </div>
+      </div>
+      <div class="panel panel-default">
+        <div class="panel-heading" role="tab">
+          <h4 class="panel-title">
+            <a role="button" data-toggle="collapse"  aria-expanded="false">
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system</span>
+            </a>
+          </h4>
+        </div>
+        <div  class="panel-collapse collapse" role="tabpanel">
+          <div class="panel-body">
+            <div class="row">
+              <div class="col-md-2"/>
+              <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+                <table class="table">
+                  <thead>
+                    <tr>
+                      <th>Tables</th>
+                    </tr>
+                  </thead>
+                  <tbody>
+                    <tr class="text-primary">
+                      <td>&quot;IndexInfo&quot;</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>available_ranges</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>batches</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>batchlog</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>built_views</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>compaction_history</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>hints</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>local</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>paxos</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>peer_events</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>peers</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>prepared_statements</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>range_xfers</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>size_estimates</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>sstable_activity</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>transferred_ranges</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>views_builds_in_progress</td>
+                    </tr>
+                  </tbody>
+                </table>
+              </div>
+              <div class="col-md-2"/>
             </div>
-
-            <div class="panel panel-default">
-                <div class="panel-heading" role="tab">
-                    <h4 class="panel-title">
-                        <a role="button" data-toggle="collapse" data-target="#d0b7ecb0-3518-11e5-86fc-8f0ea8ae1a37" aria-expanded="false">
-                            <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;samples</span>
-                        </a>
-                    </h4>
-                </div>
-                <div id="d0b7ecb0-3518-11e5-86fc-8f0ea8ae1a37" class="panel-collapse collapse" role="tabpanel">
-                    <div class="panel-body">
-                        <div class="row">
-                            <div class="col-md-2"/>
-                            <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
-
-                                <span><h4>No Table</h4></span>
-
-                            </div>
-                            <div class="col-md-2"/>
-                        </div>
-                    </div>
-                </div>
+          </div>
+        </div>
+      </div>
+      <div class="panel panel-default">
+        <div class="panel-heading" role="tab">
+          <h4 class="panel-title">
+            <a role="button" data-toggle="collapse"  aria-expanded="false">
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system_auth</span>
+            </a>
+          </h4>
+        </div>
+        <div  class="panel-collapse collapse" role="tabpanel">
+          <div class="panel-body">
+            <div class="row">
+              <div class="col-md-2"/>
+              <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+                <table class="table">
+                  <thead>
+                    <tr>
+                      <th>Tables</th>
+                    </tr>
+                  </thead>
+                  <tbody>
+                    <tr class="text-primary">
+                      <td>resource_role_permissons_index</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>role_members</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>role_permissions</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>roles</td>
+                    </tr>
+                  </tbody>
+                </table>
+              </div>
+              <div class="col-md-2"/>
+            </div>
+          </div>
+        </div>
+      </div>
+      <div class="panel panel-default">
+        <div class="panel-heading" role="tab">
+          <h4 class="panel-title">
+            <a role="button" data-toggle="collapse"  aria-expanded="false">
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system_distributed</span>
+            </a>
+          </h4>
+        </div>
+        <div  class="panel-collapse collapse" role="tabpanel">
+          <div class="panel-body">
+            <div class="row">
+              <div class="col-md-2"/>
+              <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+                <table class="table">
+                  <thead>
+                    <tr>
+                      <th>Tables</th>
+                    </tr>
+                  </thead>
+                  <tbody>
+                    <tr class="text-primary">
+                      <td>parent_repair_history</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>repair_history</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>view_build_status</td>
+                    </tr>
+                  </tbody>
+                </table>
+              </div>
+              <div class="col-md-2"/>
             </div>
-
-            <div class="panel panel-default">
-                <div class="panel-heading" role="tab">
-                    <h4 class="panel-title">
-                        <a role="button" data-toggle="collapse" data-target="#d0b7ecb1-3518-11e5-86fc-8f0ea8ae1a37" aria-expanded="false">
-                            <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system</span>
-                        </a>
-                    </h4>
-                </div>
-                <div id="d0b7ecb1-3518-11e5-86fc-8f0ea8ae1a37" class="panel-collapse collapse" role="tabpanel">
-                    <div class="panel-body">
-                        <div class="row">
-                            <div class="col-md-2"/>
-                            <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
-
-
-                                <table class="table">
-                                    <thead>
-                                    <tr><th>Tables</th></tr>
-                                    </thead>
-                                    <tbody>
-
-                                    <tr class="text-primary"><td>IndexInfo</td></tr>
-
-                                    <tr class="text-primary"><td>batchlog</td></tr>
-
-                                    <tr class="text-primary"><td>compaction_history</td></tr>
-
-                                    <tr class="text-primary"><td>compactions_in_progress</td></tr>
-
-                                    <tr class="text-primary"><td>hints</td></tr>
-
-                                    <tr class="text-primary"><td>local</td></tr>
-
-                                    <tr class="text-primary"><td>paxos</td></tr>
-
-                                    <tr class="text-primary"><td>peer_events</td></tr>
-
-                                    <tr class="text-primary"><td>peers</td></tr>
-
-                                    <tr class="text-primary"><td>range_xfers</td></tr>
-
-                                    <tr class="text-primary"><td>schema_columnfamilies</td></tr>
-
-                                    <tr class="text-primary"><td>schema_columns</td></tr>
-
-                                    <tr class="text-primary"><td>schema_keyspaces</td></tr>
-
-                                    <tr class="text-primary"><td>schema_triggers</td></tr>
-
-                                    <tr class="text-primary"><td>schema_usertypes</td></tr>
-
-                                    <tr class="text-primary"><td>size_estimates</td></tr>
-
-                                    <tr class="text-primary"><td>sstable_activity</td></tr>
-
-                                    </tbody>
-                                </table>
-                            </div>
-                            <div class="col-md-2"/>
-                        </div>
-                    </div>
-                </div>
+          </div>
+        </div>
+      </div>
+      <div class="panel panel-default">
+        <div class="panel-heading" role="tab">
+          <h4 class="panel-title">
+            <a role="button" data-toggle="collapse"  aria-expanded="false">
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system_schema</span>
+            </a>
+          </h4>
+        </div>
+        <div  class="panel-collapse collapse" role="tabpanel">
+          <div class="panel-body">
+            <div class="row">
+              <div class="col-md-2"/>
+              <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+                <table class="table">
+                  <thead>
+                    <tr>
+                      <th>Tables</th>
+                    </tr>
+                  </thead>
+                  <tbody>
+                    <tr class="text-primary">
+                      <td>aggregates</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>columns</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>dropped_columns</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>functions</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>indexes</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>keyspaces</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>tables</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>triggers</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>types</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>views</td>
+                    </tr>
+                  </tbody>
+                </table>
+              </div>
+              <div class="col-md-2"/>
             </div>
-
-            <div class="panel panel-default">
-                <div class="panel-heading" role="tab">
-                    <h4 class="panel-title">
-                        <a role="button" data-toggle="collapse" data-target="#d0b813c0-3518-11e5-86fc-8f0ea8ae1a37" aria-expanded="false">
-                            <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system_traces</span>
-                        </a>
-                    </h4>
-                </div>
-                <div id="d0b813c0-3518-11e5-86fc-8f0ea8ae1a37" class="panel-collapse collapse" role="tabpanel">
-                    <div class="panel-body">
-                        <div class="row">
-                            <div class="col-md-2"/>
-                            <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
-
-
-                                <table class="table">
-                                    <thead>
-                                    <tr><th>Tables</th></tr>
-                                    </thead>
-                                    <tbody>
-
-                                    <tr class="text-primary"><td>events</td></tr>
-
-                                    <tr class="text-primary"><td>sessions</td></tr>
-
-                                    </tbody>
-                                </table>
-                            </div>
-                            <div class="col-md-2"/>
-                        </div>
-                    </div>
-                </div>
+          </div>
+        </div>
+      </div>
+      <div class="panel panel-default">
+        <div class="panel-heading" role="tab">
+          <h4 class="panel-title">
+            <a role="button" data-toggle="collapse"  aria-expanded="false">
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;system_traces</span>
+            </a>
+          </h4>
+        </div>
+        <div  class="panel-collapse collapse" role="tabpanel">
+          <div class="panel-body">
+            <div class="row">
+              <div class="col-md-2"/>
+              <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+                <table class="table">
+                  <thead>
+                    <tr>
+                      <th>Tables</th>
+                    </tr>
+                  </thead>
+                  <tbody>
+                    <tr class="text-primary">
+                      <td>events</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>sessions</td>
+                    </tr>
+                  </tbody>
+                </table>
+              </div>
+              <div class="col-md-2"/>
             </div>
-
-            <div class="panel panel-default">
-                <div class="panel-heading" role="tab">
-                    <h4 class="panel-title">
-                        <a role="button" data-toggle="collapse" data-target="#d0b813c1-3518-11e5-86fc-8f0ea8ae1a37" aria-expanded="false">
-                            <span class="text-danger"><i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;zeppelin</span>
-                        </a>
-                    </h4>
-                </div>
-                <div id="d0b813c1-3518-11e5-86fc-8f0ea8ae1a37" class="panel-collapse collapse" role="tabpanel">
-                    <div class="panel-body">
-                        <div class="row">
-                            <div class="col-md-2"/>
-                            <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
-
-
-                                <table class="table">
-                                    <thead>
-                                    <tr><th>Tables</th></tr>
-                                    </thead>
-                                    <tbody>
-
-                                    <tr class="text-primary"><td>artists</td></tr>
-
-                                    <tr class="text-primary"><td>prepared</td></tr>
-
-                                    <tr class="text-primary"><td>ts</td></tr>
-
-                                    <tr class="text-primary"><td>users</td></tr>
-
-                                    </tbody>
-                                </table>
-                            </div>
-                            <div class="col-md-2"/>
-                        </div>
-                    </div>
-                </div>
+          </div>
+        </div>
+      </div>
+      <div class="panel panel-default">
+        <div class="panel-heading" role="tab">
+          <h4 class="panel-title">
+            <a role="button" data-toggle="collapse"  aria-expanded="false">
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;zeppelin</span>
+            </a>
+          </h4>
+        </div>
+        <div  class="panel-collapse collapse" role="tabpanel">
+          <div class="panel-body">
+            <div class="row">
+              <div class="col-md-2"/>
+              <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+                <table class="table">
+                  <thead>
+                    <tr>
+                      <th>Tables</th>
+                    </tr>
+                  </thead>
+                  <tbody>
+                    <tr class="text-primary">
+                      <td>artists</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>no_select</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>prepared</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>ts</td>
+                    </tr>
+                    <tr class="text-primary">
+                      <td>users</td>
+                    </tr>
+                  </tbody>
+                </table>
+              </div>
+              <div class="col-md-2"/>
             </div>
-
+          </div>
         </div>
+      </div>
     </div>
-</div>
\ No newline at end of file
+  </div>
+</div>
diff --git a/cassandra/src/test/resources/scalate/DescribeTypes.html b/cassandra/src/test/resources/scalate/DescribeTypes.html
new file mode 100644
index 0000000..d13af85
--- /dev/null
+++ b/cassandra/src/test/resources/scalate/DescribeTypes.html
@@ -0,0 +1,179 @@
+<br/>
+<br/>
+<nav class="navbar navbar-default">
+  <ul class="nav navbar-nav">
+    <li role="presentation" class="dropdown">
+      <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
+        <span class="text-muted">
+          <i class="glyphicon glyphicon-dashboard"/>&nbsp;<strong>Test Cluster</strong>
+        </span>
+        <span class="text-muted caret">
+        </span>
+        <ul class="dropdown-menu">
+          <li class="dropdown-header">
+            <span class="text-danger">Keyspaces</span>
+          </li>
+          <li>
+            <a role="button" data-toggle="collapse" >
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;live_data</span>
+            </a>
+          </li>
+          <li>
+            <a role="button" data-toggle="collapse" >
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;zeppelin</span>
+            </a>
+          </li>
+        </ul>
+      </a>
+    </li>
+    <li>
+      <a>
+        <strong>DESCRIBE TYPES;</strong>
+      </a>
+    </li>
+  </ul>
+  <ul class="nav navbar-nav navbar-right">
+    <li class="dropdown">
+      <a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">
+        <strong>Legend</strong>
+        <span class="caret">
+        </span>
+      </a>
+      <ul class="dropdown-menu">
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-dashboard text-muted" />&nbsp;Cluster</a>
+        </li>
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-folder-open text-danger" />&nbsp;&nbsp;Keyspace</a>
+        </li>
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-copyright-mark text-warning" />&nbsp;&nbsp;UDT</a>
+        </li>
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-th-list text-primary" />&nbsp;&nbsp;Table</a>
+        </li>
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-eye-open text-primary" />&nbsp;&nbsp;Materialized View</a>
+        </li>
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-random text-success" />&nbsp;&nbsp;Function</a>
+        </li>
+        <li>
+          <a role="button">
+            <i class="glyphicon glyphicon-retweet text-success" />&nbsp;&nbsp;Aggregate</a>
+        </li>
+        <li role="separator" class="divider text-muted">
+        </li>
+        <li class="dropdown-header">
+          <span class="text-primary">Table icons</span>
+        </li>
+        <li class="bg-info">
+          <a role="button">
+            <i class="glyphicon glyphicon-fullscreen" />&nbsp;&nbsp;Partition Key</a>
+        </li>
+        <li class="bg-warning">
+          <a role="button">
+            <i class="glyphicon glyphicon-pushpin" />&nbsp;&nbsp;Static Column</a>
+        </li>
+        <li class="bg-success">
+          <a role="button">
+            <i class="glyphicon glyphicon-sort" />&nbsp;&nbsp;Clustering Column</a>
+        </li>
+        <li class="bg-success">
+          <a role="button">
+            <i class="glyphicon glyphicon-sort-by-attributes" />&nbsp;&nbsp;Clustering Order ASC</a>
+        </li>
+        <li class="bg-success">
+          <a role="button">
+            <i class="glyphicon glyphicon-sort-by-attributes-alt" />&nbsp;&nbsp;Clustering Order DESC</a>
+        </li>
+      </ul>
+    </li>
+    <li>
+      <a href="#">
+      </a>
+    </li>
+  </ul>
+</nav>
+<hr/>
+<div class="container">
+  <div class="row">
+    <div class="panel-group" role="tablist" aria-multiselectable="true">
+      <div class="panel panel-default">
+        <div class="panel-heading" role="tab">
+          <h4 class="panel-title">
+            <a role="button" data-toggle="collapse"  aria-expanded="false">
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;live_data</span>
+            </a>
+          </h4>
+        </div>
+        <div  class="panel-collapse collapse" role="tabpanel">
+          <div class="panel-body">
+            <div class="row">
+              <div class="col-md-2"/>
+              <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+                <table class="table">
+                  <thead>
+                    <tr>
+                      <th>UDT</th>
+                    </tr>
+                  </thead>
+                  <tbody>
+                    <tr class="text-warning">
+                      <td>address</td>
+                    </tr>
+                    <tr class="text-warning">
+                      <td>geolocation</td>
+                    </tr>
+                  </tbody>
+                </table>
+              </div>
+              <div class="col-md-2"/>
+            </div>
+          </div>
+        </div>
+      </div>
+      <div class="panel panel-default">
+        <div class="panel-heading" role="tab">
+          <h4 class="panel-title">
+            <a role="button" data-toggle="collapse"  aria-expanded="false">
+              <span class="text-danger">
+                <i class="glyphicon glyphicon-folder-open"/>&nbsp;&nbsp;zeppelin</span>
+            </a>
+          </h4>
+        </div>
+        <div  class="panel-collapse collapse" role="tabpanel">
+          <div class="panel-body">
+            <div class="row">
+              <div class="col-md-2"/>
+              <div class="col-md-8 col-offset-md-2 table-responsive table-bordered">
+                <table class="table">
+                  <thead>
+                    <tr>
+                      <th>UDT</th>
+                    </tr>
+                  </thead>
+                  <tbody>
+                    <tr class="text-warning">
+                      <td>address</td>
+                    </tr>
+                  </tbody>
+                </table>
+              </div>
+              <div class="col-md-2"/>
+            </div>
+          </div>
+        </div>
+      </div>
+    </div>
+  </div>
+</div>
diff --git a/cassandra/src/test/resources/scalate/Help.html b/cassandra/src/test/resources/scalate/Help.html
index d784fe3..ce702ab 100644
--- a/cassandra/src/test/resources/scalate/Help.html
+++ b/cassandra/src/test/resources/scalate/Help.html
@@ -1 +1 @@
-<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-info"><i class="glyphicon glyphicon-book"/>&nbsp;<strong>Please select ...</strong></span><span class="text-info caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-info">Topics</span></li><li><a role="button" data-toggl [...]
\ No newline at end of file
+<br/><br/><nav class="navbar navbar-default"><ul class="nav navbar-nav"><li role="presentation" class="dropdown"><a class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><span class="text-info"><i class="glyphicon glyphicon-book"/>&nbsp;<strong>Please select ...</strong></span><span class="text-info caret"></span><ul class="dropdown-menu"><li class="dropdown-header"><span class="text-info">Topics</span></li><li><a role="button" data-toggl [...]
\ No newline at end of file
diff --git a/cassandra/src/test/resources/scalate/NoResultWithExecutionInfo.html b/cassandra/src/test/resources/scalate/NoResultWithExecutionInfo.html
index c8975c8..9ca4d45 100644
--- a/cassandra/src/test/resources/scalate/NoResultWithExecutionInfo.html
+++ b/cassandra/src/test/resources/scalate/NoResultWithExecutionInfo.html
@@ -1,42 +1 @@
-<div class="container">
-    <div class="row text-center">
-        <h4>No Result</h4>
-    </div>
-    <br/>
-    <div class="row">
-        <div class="col-md-3"></div>
-        <div class="col-md-6 col-offset-md-3 table-responsive table-bordered">
-            <table class="table">
-                <caption><h5>Last query execution info</h5></caption>
-                <thead>
-                <tr>
-                    <th>Info</th>
-                    <th>Value</th>
-                </tr>
-                </thead>
-                <tbody>
-                <tr>
-                    <td>Statement</td>
-                    <td>CREATE TABLE IF NOT EXISTS no_select(id int PRIMARY KEY);</td>
-                </tr>
-                <tr>
-                    <td>Achieved Consistency</td>
-                    <td>N/A</td>
-                </tr>
-                <tr>
-                    <td>Tried Hosts</td>
-                    <td>TRIED_HOSTS</td>
-                </tr>
-                <tr>
-                    <td>Queried Hosts</td>
-                    <td>QUERIED_HOSTS</td>
-                </tr>
-                <tr>
-                    <td>Schema In Agreement</td>
-                    <td>true</td>
-                </tr>
-                </tbody>
-            </table>
-        </div>
-    </div>
-</div>
\ No newline at end of file
+<div class="container"><div class="row text-center"><h4>No Result</h4></div><br/><div class="row"><div class="col-md-3"></div><div class="col-md-6 col-offset-md-3 table-responsive table-bordered"><table class="table"><caption><h5>Last query execution info</h5></caption><thead><tr><th>Info</th><th>Value</th></tr></thead><tbody><tr><td>Statement</td><td>CREATE TABLE IF NOT EXISTS no_select(id int PRIMARY KEY);</td></tr><tr><td>Tried Hosts</td><td>localhost:9142</td></tr><tr><td>Queried Hos [...]
\ No newline at end of file
diff --git a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/EnhancedSessionTest.scala b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/EnhancedSessionTest.scala
index c9ba19f..006fc14 100644
--- a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/EnhancedSessionTest.scala
+++ b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/EnhancedSessionTest.scala
@@ -17,7 +17,7 @@
 
 package org.apache.zeppelin.cassandra
 
-import com.datastax.driver.core.{BatchStatement, SimpleStatement}
+import com.datastax.oss.driver.api.core.cql.{BatchStatement, BatchType, SimpleStatement}
 import org.scalatest.FlatSpec
 
 class EnhancedSessionTest extends FlatSpec {
@@ -48,24 +48,24 @@ class EnhancedSessionTest extends FlatSpec {
 
   it should "be detected as DDL for create in simple statement" in {
     assertResult(true) {
-      EnhancedSession.isDDLStatement(new SimpleStatement("create TABLE if not exists test.test(id int primary key);"))
+      EnhancedSession.isDDLStatement(SimpleStatement.newInstance("create TABLE if not exists test.test(id int primary key);"))
     }
   }
 
   it should "be detected as DDL for create in batch statement" in {
-    val batch = new BatchStatement
-    batch.add(new SimpleStatement("create TABLE if not exists test.test(id int primary key);"))
-    batch.add(new SimpleStatement("insert into test.test(id) values(1);"))
+    val batch = BatchStatement.newInstance(BatchType.UNLOGGED)
+      .add(SimpleStatement.newInstance("create TABLE if not exists test.test(id int primary key);"))
+      .add(SimpleStatement.newInstance("insert into test.test(id) values(1);"))
     assertResult(true) {
       EnhancedSession.isDDLStatement(batch)
     }
   }
 
   it should "not be detected as DDL for only inserts in batch statement" in {
-    val batch = new BatchStatement
-    batch.add(new SimpleStatement("insert into test.test(id) values(1);"))
-    batch.add(new SimpleStatement("insert into test.test(id) values(2);"))
-    batch.add(new SimpleStatement("insert into test.test(id) values(3);"))
+    val batch = BatchStatement.newInstance(BatchType.LOGGED)
+      .add(SimpleStatement.newInstance("insert into test.test(id) values(1);"))
+      .add(SimpleStatement.newInstance("insert into test.test(id) values(2);"))
+      .add(SimpleStatement.newInstance("insert into test.test(id) values(3);"))
     assertResult(false) {
       EnhancedSession.isDDLStatement(batch)
     }
diff --git a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala
index 4c5c929..7998390 100644
--- a/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala
+++ b/cassandra/src/test/scala/org/apache/zeppelin/cassandra/ParagraphParserTest.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.zeppelin.cassandra
 
-import com.datastax.driver.core._
+import com.datastax.oss.driver.api.core.{ConsistencyLevel, CqlSession}
+import com.datastax.oss.driver.api.core.cql.{BatchStatement, BatchType, PreparedStatement}
 import org.apache.zeppelin.interpreter.InterpreterException
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
@@ -30,7 +31,7 @@ class ParagraphParserTest extends FlatSpec
   with Matchers
   with MockitoSugar {
 
-  val session: Session = mock[Session]
+  val session: CqlSession = mock[CqlSession]
   val preparedStatements:collection.mutable.Map[String,PreparedStatement] = collection.mutable.Map()
   val parser: ParagraphParser = new ParagraphParser()
 
@@ -62,14 +63,14 @@ class ParagraphParserTest extends FlatSpec
     parsed should matchPattern {
       case parser.Success(List(
         SimpleStm("SELECT * FROM albums LIMIT 10;"),
-        BatchStm(BatchStatement.Type.UNLOGGED,
+        BatchStm(BatchType.UNLOGGED,
           List(
             SimpleStm("INSERT INTO users(id) VALUES(10);"),
             BoundStm("test","'a',12.34")
           )
         ),
         SimpleStm("SELECT * FROM users LIMIT 10;"),
-        BatchStm(BatchStatement.Type.LOGGED,
+        BatchStm(BatchType.LOGGED,
           List(
             SimpleStm("Insert INTO users(id) VALUES(11);"),
             SimpleStm("INSERT INTO users(id) VALUES(12);")
@@ -162,20 +163,6 @@ class ParagraphParserTest extends FlatSpec
     ex.getMessage should be(s"Invalid syntax for @timestamp. It should comply to the pattern ${TIMESTAMP_PATTERN.toString}")
   }
 
-  "Parser" should "parse retry policy" in {
-    val query:String ="@retryPolicy="+CassandraInterpreter.DOWNGRADING_CONSISTENCY_RETRY
-    val parsed = parser.parseAll(parser.retryPolicy, query)
-    parsed should matchPattern {case parser.Success(DowngradingRetryPolicy, _) => }
-  }
-
-  "Parser" should "fails parsing invalid retry policy" in {
-    val query:String =""" @retryPolicy=TEST""".stripMargin
-    val ex = intercept[InterpreterException] {
-      parser.parseAll(parser.retryPolicy, query)
-    }
-    ex.getMessage should be(s"Invalid syntax for @retryPolicy. It should comply to the pattern ${RETRY_POLICIES_PATTERN.toString}")
-  }
-
   "Parser" should "parse fetch size" in {
     val query:String ="@fetchSize=100"
     val parsed = parser.parseAll(parser.fetchSize, query)
@@ -201,7 +188,7 @@ class ParagraphParserTest extends FlatSpec
     val query:String =""" sElecT * FROM users LIMIT ? ;""".stripMargin
 
     //When
-    val parsed = parser.parseAll(parser.genericStatement, query)
+    val parsed = parser.parseAll(parser.genericStatement(), query)
 
     //Then
     parsed should matchPattern { case parser.Success(SimpleStm("sElecT * FROM users LIMIT ? ;"), _) =>}
@@ -212,7 +199,7 @@ class ParagraphParserTest extends FlatSpec
     val query:String =""" @prepare[select_users]=SELECT * FROM users LIMIT ? """.stripMargin
 
     //When
-    val parsed = parser.parseAll(parser.prepare, query)
+    val parsed = parser.parseAll(parser.prepare(), query)
 
     //Then
     parsed should matchPattern { case parser.Success(PrepareStm("select_users","SELECT * FROM users LIMIT ?"), _) => }
@@ -221,7 +208,7 @@ class ParagraphParserTest extends FlatSpec
   "Parser" should "fails parsing invalid prepared statement" in {
     val query:String =""" @prepare=SELECT * FROM users LIMIT ?""".stripMargin
     val ex = intercept[InterpreterException] {
-      parser.parseAll(parser.prepare, query)
+      parser.parseAll(parser.prepare(), query)
     }
     ex.getMessage should be(s"Invalid syntax for @prepare. It should comply to the pattern: @prepare[prepared_statement_name]=CQL Statement (without semi-colon)")
   }
@@ -231,7 +218,7 @@ class ParagraphParserTest extends FlatSpec
     val query:String =""" @remove_prepare[select_users  ]""".stripMargin
 
     //When
-    val parsed = parser.parseAll(parser.removePrepare, query)
+    val parsed = parser.parseAll(parser.removePrepare(), query)
 
     //Then
     parsed should matchPattern { case parser.Success(RemovePrepareStm("select_users"), _) => }
@@ -240,7 +227,7 @@ class ParagraphParserTest extends FlatSpec
   "Parser" should "fails parsing invalid remove prepared statement" in {
     val query:String =""" @remove_prepare[select_users]=SELECT * FROM users LIMIT ?""".stripMargin
     val ex = intercept[InterpreterException] {
-      parser.parseAll(parser.removePrepare, query)
+      parser.parseAll(parser.removePrepare(), query)
     }
     ex.getMessage should be(s"Invalid syntax for @remove_prepare. It should comply to the pattern: @remove_prepare[prepared_statement_name]")
   }
@@ -250,7 +237,7 @@ class ParagraphParserTest extends FlatSpec
     val query:String =""" @bind[select_users  ]=10,'toto'""".stripMargin
 
     //When
-    val parsed = parser.parseAll(parser.bind, query)
+    val parsed = parser.parseAll(parser.bind(), query)
 
     //Then
     parsed should matchPattern { case parser.Success(BoundStm("select_users","10,'toto'"), _) => }
@@ -259,7 +246,7 @@ class ParagraphParserTest extends FlatSpec
   "Parser" should "fails parsing invalid bind statement" in {
     val query:String =""" @bind[select_users]=""".stripMargin
     val ex = intercept[InterpreterException] {
-      parser.parseAll(parser.bind, query)
+      parser.parseAll(parser.bind(), query)
     }
     ex.getMessage should be("""Invalid syntax for @bind. It should comply to the pattern: @bind[prepared_statement_name]=10,'jdoe','John DOE',12345,'2015-07-32 12:04:23.234' OR @bind[prepared_statement_name] with no bound value. No semi-colon""")
   }
@@ -280,7 +267,7 @@ class ParagraphParserTest extends FlatSpec
     //Then
     parsed should matchPattern {
       case parser.Success(BatchStm(
-      BatchStatement.Type.LOGGED,
+      BatchType.LOGGED,
       List(
         SimpleStm("Insert INTO users(id) VALUES(10);"),
         BoundStm("select_users", "10,'toto'"),
@@ -346,7 +333,6 @@ class ParagraphParserTest extends FlatSpec
       "   INSERT INTO zeppelin.albums(title,artist,year) VALUES('Primitive','Soulfly',2003);\n"+
       "APPLY BATCH;\n"+
       "@timestamp=10\n" +
-      "@retryPolicy=DOWNGRADING_CONSISTENCY\n" +
       "SELECT * FROM zeppelin.albums;"
 
     val parsed = parser.parseAll(parser.queries, query)
@@ -356,7 +342,7 @@ class ParagraphParserTest extends FlatSpec
       SimpleStm("CREATE TABLE IF NOT EXISTS zeppelin.albums(\n    title text PRIMARY KEY,\n    artist text,\n    year int\n);"),
       Consistency(ConsistencyLevel.THREE),
       SerialConsistency(ConsistencyLevel.SERIAL),
-      BatchStm(BatchStatement.Type.LOGGED,
+      BatchStm(BatchType.LOGGED,
         List(
           SimpleStm("INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Impossible Dream EP','Carter the Unstoppable Sex Machine',1992);"),
           SimpleStm("INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Way You Are','Tears for Fears',1983);"),
@@ -364,7 +350,6 @@ class ParagraphParserTest extends FlatSpec
         )
       ),
       Timestamp(10L),
-      DowngradingRetryPolicy,
       SimpleStm("SELECT * FROM zeppelin.albums;")
       ), _) =>
     }
@@ -389,7 +374,7 @@ class ParagraphParserTest extends FlatSpec
     parsed should matchPattern {
       case parser.Success(List(
       SimpleStm("CREATE TABLE IF NOT EXISTS zeppelin.albums(\n    title text PRIMARY KEY,\n    artist text,\n    year int\n);"),
-      BatchStm(BatchStatement.Type.LOGGED,
+      BatchStm(BatchType.LOGGED,
         List(
           SimpleStm("INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Impossible Dream EP','Carter the Unstoppable Sex Machine',1992);"),
           SimpleStm("INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Way You Are','Tears for Fears',1983);"),
diff --git a/docs/interpreter/cassandra.md b/docs/interpreter/cassandra.md
index 3e53725..1837c53 100644
--- a/docs/interpreter/cassandra.md
+++ b/docs/interpreter/cassandra.md
@@ -79,7 +79,7 @@ The **Cassandra** interpreter accepts the following commands
     </tr>
     <tr>
       <td nowrap>Option commands</td>
-      <td>`@consistency`, `@retryPolicy`, `@fetchSize` ...</td>
+      <td>`@consistency`, `@fetchSize` ...</td>
       <td>Inject runtime options to all statements in the paragraph</td>
     </tr>
     <tr>
@@ -113,9 +113,8 @@ Each statement should be separated by a semi-colon ( **;** ) except the special
 4. `@consistency`
 5. `@serialConsistency`
 6. `@timestamp`
-7. `@retryPolicy`
-8. `@fetchSize`
-9. `@requestTimeOut`
+7. `@fetchSize`
+8. `@requestTimeOut`
 
 Multi-line statements as well as multiple statements on the same line are also supported as long as they are separated by a semi-colon. Ex:
 
@@ -165,8 +164,8 @@ The complete list of all CQL statements and versions can be found below:
      <td><strong>3.x</strong></td>
      <td>
         <a target="_blank"
-          href="http://docs.datastax.com/en/cql/3.3/cql/cqlIntro.html">
-          http://docs.datastax.com/en/cql/3.3/cql/cqlIntro.html
+          href="https://docs.datastax.com/en/archived/cql/3.3/cql/cqlIntro.html">
+          https://docs.datastax.com/en/archived/cql/3.3/cql/cqlIntro.html
         </a>
      </td>
    </tr>   
@@ -174,8 +173,8 @@ The complete list of all CQL statements and versions can be found below:
      <td><strong>2.2</strong></td>
      <td>
         <a target="_blank"
-          href="http://docs.datastax.com/en/cql/3.3/cql/cqlIntro.html">
-          http://docs.datastax.com/en/cql/3.3/cql/cqlIntro.html
+          href="https://docs.datastax.com/en/archived/cql/3.3/cql/cqlIntro.html">
+          https://docs.datastax.com/en/archived/cql/3.3/cql/cqlIntro.html
         </a>
      </td>
    </tr>
@@ -192,8 +191,8 @@ The complete list of all CQL statements and versions can be found below:
      <td><strong>1.2</strong></td>
      <td>
         <a target="_blank"
-          href="http://docs.datastax.com/en/cql/3.0/cql/aboutCQL.html">
-          http://docs.datastax.com/en/cql/3.0/cql/aboutCQL.html
+          href="https://docs.datastax.com/en/archived/cql/3.1/cql/cql_intro_c.html">
+          https://docs.datastax.com/en/archived/cql/3.1/cql/cql_intro_c.html
         </a>
      </td>
    </tr>
@@ -351,11 +350,6 @@ Below is the list of all parameters:
       </td>
    </tr>
    <tr>
-     <td nowrap>Retry Policy</td>
-     <td><strong>@retryPolicy=<em>value</em></strong></td>
-     <td>Apply the given retry policy to all queries in the paragraph</td>
-   </tr>
-   <tr>
      <td nowrap>Fetch Size</td>
      <td><strong>@fetchSize=<em>integer value</em></strong></td>
      <td>Apply the given fetch size to all queries in the paragraph</td>
@@ -389,10 +383,6 @@ Some parameters only accept restricted values:
      <td>Any long value</td>
    </tr>
    <tr>
-     <td nowrap>Retry Policy</td>
-     <td><strong>DEFAULT, DOWNGRADING_CONSISTENCY, FALLTHROUGH, LOGGING_DEFAULT, LOGGING_DOWNGRADING, LOGGING_FALLTHROUGH</strong></td>
-   </tr>
-   <tr>
      <td nowrap>Fetch Size</td>
      <td>Any integer value</td>
    </tr>
@@ -494,6 +484,7 @@ Bound values are not mandatory for the **@bind** statement. However if you provi
 * Date values should be enclosed between simple quotes (**'**) and respect the formats (full list is in the [documentation](https://docs.datastax.com/en/cql/3.3/cql/cql_reference/timestamp_type_r.html)):
   1. yyyy-MM-dd HH:MM:ss
   2. yyyy-MM-dd HH:MM:ss.SSS
+  2. yyyy-mm-dd'T'HH:mm:ss.SSSZ
 * **null** is parsed as-is
 * **boolean** (`true`|`false`) are parsed as-is
 * collection values must follow the **[standard CQL syntax]**:
@@ -584,7 +575,7 @@ The **isolated** mode is the most extreme and will create as many JVM/`com.datas
 ## Interpreter Configuration
 
 To configure the **Cassandra** interpreter, go to the **Interpreter** menu and scroll down to change the parameters.
-The **Cassandra** interpreter is using the official **[Cassandra Java Driver]** and most of the parameters are used
+The **Cassandra** interpreter is using the official **[Datastax Java Driver for Apache Cassandra]®** and most of the parameters are used
 to configure the Java driver
 
 Below are the configuration parameters and their default values.
@@ -644,10 +635,9 @@ Below are the configuration parameters and their default values.
    <tr>
      <td>`cassandra.load.balancing.policy`</td>
      <td>
-        Load balancing policy. Default = `new TokenAwarePolicy(new DCAwareRoundRobinPolicy())`
+        Load balancing policy. Default = `DefaultLoadBalancingPolicy`
         To Specify your own policy, provide the <em>fully qualify class name (FQCN)</em> of your policy.
-        At runtime the interpreter will instantiate the policy using
-        <strong>Class.forName(FQCN)</strong>
+        At runtime the driver will instantiate the policy using class name.
      </td>
      <td>DEFAULT</td>
    </tr>
@@ -657,12 +647,12 @@ Below are the configuration parameters and their default values.
      <td>10</td>
    </tr>
    <tr>
-     <td>`cassandra.pooling.core.connection.per.host.local`</td>
+     <td>`cassandra.pooling.connection.per.host.local`</td>
      <td>Protocol V2 and below default = 2. Protocol V3 and above default = 1</td>
      <td>2</td>
    </tr>
    <tr>
-     <td>`cassandra.pooling.core.connection.per.host.remote`</td>
+     <td>`cassandra.pooling.connection.per.host.remote`</td>
      <td>Protocol V2 and below default = 1. Protocol V3 and above default = 1</td>
      <td>1</td>
    </tr>
@@ -672,49 +662,19 @@ Below are the configuration parameters and their default values.
      <td>30</td>
    </tr>
    <tr>
-     <td>`cassandra.pooling.idle.timeout.seconds`</td>
-     <td>Cassandra idle time out in seconds</td>
-     <td>120</td>
-   </tr>
-   <tr>
-     <td>`cassandra.pooling.max.connection.per.host.local`</td>
-     <td>Protocol V2 and below default = 8. Protocol V3 and above default = 1</td>
-     <td>8</td>
-   </tr>
-   <tr>
-     <td>`cassandra.pooling.max.connection.per.host.remote`</td>
-     <td>Protocol V2 and below default = 2. Protocol V3 and above default = 1</td>
-     <td>2</td>
-   </tr>
-   <tr>
-     <td>`cassandra.pooling.max.request.per.connection.local`</td>
+     <td>`cassandra.pooling.max.request.per.connection`</td>
      <td>Protocol V2 and below default = 128. Protocol V3 and above default = 1024</td>
      <td>128</td>
    </tr>
    <tr>
-     <td>`cassandra.pooling.max.request.per.connection.remote`</td>
-     <td>Protocol V2 and below default = 128. Protocol V3 and above default = 256</td>
-     <td>128</td>
-   </tr>
-   <tr>
-     <td>`cassandra.pooling.new.connection.threshold.local`</td>
-     <td>Protocol V2 and below default = 100. Protocol V3 and above default = 800</td>
-     <td>100</td>
-   </tr>
-   <tr>
-     <td>`cassandra.pooling.new.connection.threshold.remote`</td>
-     <td>Protocol V2 and below default = 100. Protocol V3 and above default = 200</td>
-     <td>100</td>
-   </tr>
-   <tr>
      <td>`cassandra.pooling.pool.timeout.millisecs`</td>
      <td>Cassandra pool time out in millisecs</td>
      <td>5000</td>
    </tr>
    <tr>
      <td>`cassandra.protocol.version`</td>
-     <td>Cassandra binary protocol version</td>
-     <td>4</td>
+     <td>Cassandra binary protocol version (`3`, `4`, `DSE1`, `DSE2`)</td>
+     <td>`DEFAULT` (detected automatically)</td>
    </tr>
    <tr>
      <td>cassandra.query.default.consistency</td>
@@ -743,10 +703,9 @@ Below are the configuration parameters and their default values.
      <td>`cassandra.reconnection.policy`</td>
      <td>
         Cassandra Reconnection Policy.
-        Default = `new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000)`
+        Default = `ExponentialReconnectionPolicy`
         To Specify your own policy, provide the <em>fully qualify class name (FQCN)</em> of your policy.
-        At runtime the interpreter will instantiate the policy using
-        <strong>Class.forName(FQCN)</strong>
+        At runtime the driver will instantiate the policy using class name.
      </td>
      <td>DEFAULT</td>
    </tr>
@@ -754,10 +713,9 @@ Below are the configuration parameters and their default values.
      <td>`cassandra.retry.policy`</td>
      <td>
         Cassandra Retry Policy.
-        Default = `DefaultRetryPolicy.INSTANCE`
+        Default = `DefaultRetryPolicy`
         To Specify your own policy, provide the <em>fully qualify class name (FQCN)</em> of your policy.
-        At runtime the interpreter will instantiate the policy using
-        <strong>Class.forName(FQCN)</strong>
+        At runtime the driver will instantiate the policy using class name.
      </td>
      <td>DEFAULT</td>
    </tr>
@@ -780,10 +738,9 @@ Below are the configuration parameters and their default values.
      <td>`cassandra.speculative.execution.policy`</td>
      <td>
         Cassandra Speculative Execution Policy.
-        Default = `NoSpeculativeExecutionPolicy.INSTANCE`
+        Default = `NoSpeculativeExecutionPolicy`
         To Specify your own policy, provide the <em>fully qualify class name (FQCN)</em> of your policy.
-        At runtime the interpreter will instantiate the policy using
-        <strong>Class.forName(FQCN)</strong>
+        At runtime the driver will instantiate the policy using class name.
      </td>
      <td>DEFAULT</td>
    </tr>
@@ -814,6 +771,15 @@ Below are the configuration parameters and their default values.
 
 ## Change Log
 
+**3.2** _(Zeppelin {{ site.ZEPPELIN_VERSION }})_ :
+
+* Refactor to use unified Java driver 4.5
+  ([ZEPPELIN-4378](https://issues.apache.org/jira/browse/ZEPPELIN-4378):
+  * changes in configuration were necessary, as new driver has different architecture, and
+  configuration options;
+  * interpreter got support for DSE-specific data types, and other extensions;
+  * support for `@retryPolicy` is removed, as only single retry policy is shipped with driver.
+
 **3.1** _(Zeppelin {{ site.ZEPPELIN_VERSION }})_ :
 
 * Upgrade Java driver to 3.7.2 ([ZEPPELIN-4331](https://issues.apache.org/jira/browse/ZEPPELIN-4331);
@@ -844,14 +810,12 @@ Below are the configuration parameters and their default values.
 
 ## Bugs & Contacts
 
- If you encounter a bug for this interpreter, please create a **[JIRA]** ticket and ping me on Twitter
- at **[@doanduyhai]**
+ If you encounter a bug for this interpreter, please create a **[JIRA]** ticket.
 
-[Cassandra Java Driver]: https://github.com/datastax/java-driver
+[Datastax Java Driver for Apache Cassandra]: https://docs.datastax.com/en/developer/java-driver/latest/
 [standard CQL syntax]: http://docs.datastax.com/en/cql/3.1/cql/cql_using/use_collections_c.html
 [Tuple CQL syntax]: http://docs.datastax.com/en/cql/3.1/cql/cql_reference/tupleType.html
 [UDT CQL syntax]: http://docs.datastax.com/en/cql/3.1/cql/cql_using/cqlUseUDT.html
 [Zeppelin Dynamic Form](../usage/dynamic_form/intro.html)
 [Interpreter Binding Mode](../usage/interpreter/interpreter_binding_mode.html)
-[JIRA]: https://issues.apache.org/jira/browse/ZEPPELIN-382?jql=project%20%3D%20ZEPPELIN
-[@doanduyhai]: https://twitter.com/doanduyhai
+[JIRA]: https://issues.apache.org/jira/browse/ZEPPELIN
diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE
index a861833..386398e 100644
--- a/zeppelin-distribution/src/bin_license/LICENSE
+++ b/zeppelin-distribution/src/bin_license/LICENSE
@@ -68,9 +68,9 @@ The following components are provided under Apache License.
     (Apache 2.0) xml apis (xml-apis:xml-apis:jar:1.4.01 - http://xerces.apache.org/xml-commons/components/external)
     (Apache 2.0) java-xmlbuilder (com.jamesmurty.utils:java-xmlbuilder:jar:1.0 - https://github.com/jmurty/java-xmlbuilder)
     (Apache 2.0) compress-lzf (com.ning:compress-lzf:jar:1.0.3 - https://github.com/ning/compress) Copyright 2009-2010 Ning, Inc.
-    (Apache 2.0) java-driver-core (com.datastax.oss:java-driver-core:jar:3.7.2 - https://github.com/datastax/java-driver)
-    (Apache 2.0) Snappy-java (org.xerial.snappy:snappy-java:1.1.2.4 - https://github.com/xerial/snappy-java/)
-    (Apache 2.0) lz4-java (org.lz4:lz4-java:jar:1.4.1 - https://github.com/lz4/lz4-java)
+    (Apache 2.0) java-driver-core (com.datastax.oss:java-driver-core:jar:4.5.1 - https://github.com/datastax/java-driver)
+    (Apache 2.0) Snappy-java (org.xerial.snappy:snappy-java:1.1.7.3 - https://github.com/xerial/snappy-java/)
+    (Apache 2.0) lz4-java (org.lz4:lz4-java:jar:1.6.0 - https://github.com/lz4/lz4-java)
     (Apache 2.0) RoaringBitmap (org.roaringbitmap:RoaringBitmap:jar:0.5.11 - https://github.com/lemire/RoaringBitmap)
     (Apache 2.0) json4s (org.json4s:json4s-ast_2.10:jar:3.2.10 - https://github.com/json4s/json4s)
     (Apache 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc)