You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/08/05 08:17:05 UTC

[4/6] incubator-zeppelin git commit: ZEPPELIN-179: Cassandra Interpreter

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
new file mode 100644
index 0000000..c636de9
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import com.datastax.driver.core._
+import org.apache.zeppelin.cassandra.TextBlockHierarchy._
+import org.apache.zeppelin.interpreter.InterpreterException
+
+
+/**
+ * Enhance the Java driver session
+ * with special statements
+ * to describe schema
+ */
+class EnhancedSession(val session: Session) {
+
+  val clusterDisplay = DisplaySystem.ClusterDisplay
+  val keyspaceDisplay = DisplaySystem.KeyspaceDisplay
+  val tableDisplay = DisplaySystem.TableDisplay
+  val udtDisplay = DisplaySystem.UDTDisplay
+  val helpDisplay = DisplaySystem.HelpDisplay
+  private val noResultDisplay = DisplaySystem.NoResultDisplay
+
+
+  val HTML_MAGIC = "%html \n"
+
+  val displayNoResult: String = HTML_MAGIC + noResultDisplay.formatNoResult
+
+  def displayExecutionStatistics(query: String, execInfo: ExecutionInfo): String = {
+    HTML_MAGIC + noResultDisplay.noResultWithExecutionInfo(query, execInfo)
+  }
+
+  private def execute(describeCluster: DescribeClusterCmd): String = {
+    val metaData = session.getCluster.getMetadata
+    HTML_MAGIC + clusterDisplay.formatClusterOnly(describeCluster.statement, metaData)
+  }
+
+  private def execute(describeKeyspaces: DescribeKeyspacesCmd): String = {
+    val metaData = session.getCluster.getMetadata
+    HTML_MAGIC + clusterDisplay.formatClusterContent(describeKeyspaces.statement, metaData)
+  }
+
+  private def execute(describeTables: DescribeTablesCmd): String = {
+    val metadata: Metadata = session.getCluster.getMetadata
+    HTML_MAGIC + clusterDisplay.formatAllTables(describeTables.statement,metadata)
+  }
+
+  private def execute(describeKeyspace: DescribeKeyspaceCmd): String = {
+    val keyspace: String = describeKeyspace.keyspace
+    val metadata: KeyspaceMetadata = session.getCluster.getMetadata.getKeyspace(keyspace)
+    HTML_MAGIC + keyspaceDisplay.formatKeyspaceContent(describeKeyspace.statement, metadata)
+  }
+
+  private def execute(describeTable: DescribeTableCmd): String = {
+    val metaData = session.getCluster.getMetadata
+    val tableName: String = describeTable.table
+    val keyspace: String = describeTable.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
+
+    Option(metaData.getKeyspace(keyspace)).flatMap(ks => Option(ks.getTable(tableName))) match {
+      case Some(tableMeta) => HTML_MAGIC + tableDisplay.format(describeTable.statement, tableMeta, true)
+      case None => throw new InterpreterException(s"Cannot find table $keyspace.$tableName")
+    }
+  }
+
+  private def execute(describeUDT: DescribeUDTCmd): String = {
+    val metaData = session.getCluster.getMetadata
+    val keyspace: String = describeUDT.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
+    val udtName: String = describeUDT.udtName
+
+    Option(metaData.getKeyspace(keyspace)).flatMap(ks => Option(ks.getUserType(udtName))) match {
+      case Some(userType) => HTML_MAGIC + udtDisplay.format(describeUDT.statement, userType, true)
+      case None => throw new InterpreterException(s"Cannot find type $keyspace.$udtName")
+    }
+  }
+
+  private def execute(helpCmd: HelpCmd): String = {
+    HTML_MAGIC + helpDisplay.formatHelp()
+  }
+
+
+  def execute(st: Any): Any = {
+    st match {
+      case x:DescribeClusterCmd => execute(x)
+      case x:DescribeKeyspacesCmd => execute(x)
+      case x:DescribeTablesCmd => execute(x)
+      case x:DescribeKeyspaceCmd => execute(x)
+      case x:DescribeTableCmd => execute(x)
+      case x:DescribeUDTCmd => execute(x)
+      case x:HelpCmd => execute(x)
+      case x:Statement => session.execute(x)
+      case _ => throw new InterpreterException(s"Cannot execute statement '$st' of type ${st.getClass}")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
new file mode 100644
index 0000000..809bce7
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import com.datastax.driver.core.DataType.Name._
+import com.datastax.driver.core._
+import com.datastax.driver.core.exceptions.DriverException
+import com.datastax.driver.core.policies.{LoggingRetryPolicy, FallthroughRetryPolicy, DowngradingConsistencyRetryPolicy, Policies}
+import org.apache.zeppelin.cassandra.TextBlockHierarchy._
+import org.apache.zeppelin.display.Input.ParamOption
+import org.apache.zeppelin.interpreter.InterpreterResult.Code
+import org.apache.zeppelin.interpreter.{InterpreterException, InterpreterResult, InterpreterContext}
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+
+/**
+ * Value object to store runtime query parameters
+ * @param consistency consistency level
+ * @param serialConsistency serial consistency level
+ * @param timestamp timestamp
+ * @param retryPolicy retry policy
+ * @param fetchSize query fetch size
+ */
+case class CassandraQueryOptions(consistency: Option[ConsistencyLevel],
+                                 serialConsistency:Option[ConsistencyLevel],
+                                 timestamp: Option[Long],
+                                 retryPolicy: Option[RetryPolicy],
+                                 fetchSize: Option[Int])
+
+/**
+ * Singleton object to store constants
+ */
+object InterpreterLogic {
+  
+  val CHOICES_SEPARATOR : String = """\|"""
+  val VARIABLE_PATTERN = """\{\{[^}]+\}\}""".r
+  val SIMPLE_VARIABLE_DEFINITION_PATTERN = """\{\{([^=]+)=([^=]+)\}\}""".r
+  val MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN = """\{\{([^=]+)=((?:[^=]+\|)+[^|]+)\}\}""".r
+
+  val STANDARD_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"
+  val ACCURATE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"
+
+  val defaultRetryPolicy = Policies.defaultRetryPolicy()
+  val downgradingConsistencyRetryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE
+  val fallThroughRetryPolicy = FallthroughRetryPolicy.INSTANCE
+  val loggingDefaultRetryPolicy = new LoggingRetryPolicy(defaultRetryPolicy)
+  val loggingDownGradingRetryPolicy = new LoggingRetryPolicy(downgradingConsistencyRetryPolicy)
+  val loggingFallThrougRetryPolicy = new LoggingRetryPolicy(fallThroughRetryPolicy)
+
+  val preparedStatements : mutable.Map[String,PreparedStatement] = new ConcurrentHashMap[String,PreparedStatement]().asScala
+
+  val logger = LoggerFactory.getLogger(classOf[InterpreterLogic])
+
+  val paragraphParser = new ParagraphParser
+  val boundValuesParser = new BoundValuesParser
+  
+}
+
+/**
+ * Real class to implement the
+ * interpreting logic of CQL statements
+ * and parameters blocks
+ *
+ * @param session java driver session
+ */
+class InterpreterLogic(val session: Session)  {
+
+  val enhancedSession: EnhancedSession = new EnhancedSession(session)
+
+  import InterpreterLogic._
+
+  def interpret(session:Session, stringStatements : String, context: InterpreterContext): InterpreterResult = {
+
+    logger.info(s"Executing CQL statements : \n\n$stringStatements\n")
+
+    try {
+      val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersionEnum
+
+      val queries:List[AnyBlock] = parseInput(stringStatements)
+
+      val queryOptions = extractQueryOptions(queries
+        .filter(_.blockType == ParameterBlock)
+        .map(_.get[QueryParameters]))
+
+      logger.info(s"Current Cassandra query options = $queryOptions")
+
+      val queryStatements = queries.filter(_.blockType == StatementBlock).map(_.get[QueryStatement])
+
+      //Remove prepared statements
+      queryStatements
+        .filter(_.statementType == RemovePrepareStatementType)
+        .map(_.getStatement[RemovePrepareStm])
+        .foreach(remove => {
+          logger.debug(s"Removing prepared statement '${remove.name}'")
+          preparedStatements.remove(remove.name)
+        })
+
+      //Update prepared statement maps
+      queryStatements
+        .filter(_.statementType == PrepareStatementType)
+        .map(_.getStatement[PrepareStm])
+        .foreach(statement => {
+          logger.debug(s"Get or prepare statement '${statement.name}' : ${statement.query}")
+          preparedStatements.getOrElseUpdate(statement.name,session.prepare(statement.query))
+        })
+
+      val statements: List[Any] = queryStatements
+        .filter(st => (st.statementType != PrepareStatementType) && (st.statementType != RemovePrepareStatementType))
+        .map{
+          case x:SimpleStm => generateSimpleStatement(x, queryOptions, context)
+          case x:BatchStm => {
+            val builtStatements: List[Statement] = x.statements.map {
+              case st:SimpleStm => generateSimpleStatement(st, queryOptions, context)
+              case st:BoundStm => generateBoundStatement(st, queryOptions, context)
+              case _ => throw new InterpreterException(s"Unknown statement type")
+            }
+            generateBatchStatement(x.batchType, queryOptions, builtStatements)
+          }
+          case x:BoundStm => generateBoundStatement(x, queryOptions, context)
+          case x:DescribeCommandStatement => x
+          case x:HelpCmd => x
+          case x => throw new InterpreterException(s"Unknown statement type : ${x}")
+       }
+
+      val results: List[(Any,Any)] = for (statement <- statements) yield (enhancedSession.execute(statement),statement)
+
+      if (results.nonEmpty) {
+        results.last match {
+          case(res: ResultSet, st: Statement) => buildResponseMessage((res, st), protocolVersion)
+          case(output: String, _) => new InterpreterResult(Code.SUCCESS, output)
+          case _ => throw new InterpreterException(s"Cannot parse result type : ${results.last}")
+        }
+
+      } else {
+        new InterpreterResult(Code.SUCCESS, enhancedSession.displayNoResult)
+      }
+
+    } catch {
+      case dex: DriverException => {
+        logger.error(dex.getMessage, dex)
+        new InterpreterResult(Code.ERROR, parseException(dex))
+      }
+      case pex:ParsingException => {
+        logger.error(pex.getMessage, pex)
+        new InterpreterResult(Code.ERROR, pex.getMessage)
+      }
+      case iex: InterpreterException => {
+        logger.error(iex.getMessage, iex)
+        new InterpreterResult(Code.ERROR, iex.getMessage)
+      }
+      case ex: java.lang.Exception => {
+        logger.error(ex.getMessage, ex)
+        new InterpreterResult(Code.ERROR, parseException(ex))
+      }
+    }
+  }
+
+  def buildResponseMessage(lastResultSet: (ResultSet,Statement), protocolVersion: ProtocolVersion): InterpreterResult = {
+    val output = new StringBuilder()
+    val rows: collection.mutable.ArrayBuffer[Row] = ArrayBuffer()
+
+    val iterator: util.Iterator[Row] = lastResultSet._1.iterator()
+    while (iterator.hasNext) {
+      rows.append(iterator.next())
+    }
+
+    val columnsDefinitions: List[(String, DataType)] = lastResultSet._1
+      .getColumnDefinitions
+      .asList
+      .toList // Java list -> Scala list
+      .map(definition => (definition.getName, definition.getType))
+
+
+    if (rows.nonEmpty) {
+      // Create table headers
+      output
+        .append("%table ")
+        .append(columnsDefinitions.map { case (columnName, _) => columnName }.mkString("\t")).append("\n")
+
+      // Deserialize Data
+      rows.foreach {
+        row => {
+          val data = columnsDefinitions.map {
+            case (name, dataType) => {
+              if (row.isNull(name)) null else dataType.deserialize(row.getBytesUnsafe(name), protocolVersion)
+            }
+          }
+          output.append(data.mkString("\t")).append("\n")
+        }
+      }
+    } else {
+      val lastQuery: String = lastResultSet._2.toString
+      val executionInfo: ExecutionInfo = lastResultSet._1.getExecutionInfo
+      output.append(enhancedSession.displayExecutionStatistics(lastQuery, executionInfo))
+    }
+
+    val result: String = output.toString()
+    logger.debug(s"CQL result : \n\n$result\n")
+    new InterpreterResult(Code.SUCCESS, result)
+  }
+
+  def parseInput(input:String): List[AnyBlock] = {
+    val parsingResult: ParagraphParser#ParseResult[List[AnyBlock]] = paragraphParser.parseAll(paragraphParser.queries, input)
+    parsingResult match {
+      case paragraphParser.Success(blocks,_) => blocks
+      case paragraphParser.Failure(msg,next) => {
+        throw new InterpreterException(s"Error parsing input:\n\t'$input'\nDid you forget to add ; (semi-colon) at the end of each CQL statement ?")
+      }
+      case paragraphParser.Error(msg,next) => {
+        throw new InterpreterException(s"Error parsing input:\n\t'$input'\nDid you forget to add ; (semi-colon) at the end of each CQL statement ?")
+      }
+      case _ => throw new InterpreterException(s"Error parsing input: $input")
+    }
+  }
+
+  def extractQueryOptions(parameters: List[QueryParameters]): CassandraQueryOptions = {
+
+    logger.debug(s"Extracting query options from $parameters")
+
+    val consistency: Option[ConsistencyLevel] = parameters
+      .filter(_.paramType == ConsistencyParam)
+      .map(_.getParam[Consistency])
+      .flatMap(x => Option(x.value))
+      .headOption
+
+
+    val serialConsistency: Option[ConsistencyLevel] = parameters
+      .filter(_.paramType == SerialConsistencyParam)
+      .map(_.getParam[SerialConsistency])
+      .flatMap(x => Option(x.value))
+      .headOption
+
+    val timestamp: Option[Long] = parameters
+      .filter(_.paramType == TimestampParam)
+      .map(_.getParam[Timestamp])
+      .flatMap(x => Option(x.value))
+      .headOption
+
+    val retryPolicy: Option[RetryPolicy] = parameters
+      .filter(_.paramType == RetryPolicyParam)
+      .map(_.getParam[RetryPolicy])
+      .headOption
+
+    val fetchSize: Option[Int] = parameters
+      .filter(_.paramType == FetchSizeParam)
+      .map(_.getParam[FetchSize])
+      .flatMap(x => Option(x.value))
+      .headOption
+
+    CassandraQueryOptions(consistency,serialConsistency, timestamp, retryPolicy, fetchSize)
+  }
+
+  def generateSimpleStatement(st: SimpleStm, options: CassandraQueryOptions,context: InterpreterContext): SimpleStatement = {
+    logger.debug(s"Generating simple statement : '${st.text}'")
+    val statement = new SimpleStatement(maybeExtractVariables(st.text, context))
+    applyQueryOptions(options, statement)
+    statement
+  }
+
+  def generateBoundStatement(st: BoundStm, options: CassandraQueryOptions,context: InterpreterContext): BoundStatement = {
+    logger.debug(s"Generating bound statement with name : '${st.name}' and bound values : ${st.values}")
+    preparedStatements.get(st.name) match {
+      case Some(ps) => {
+        val boundValues = maybeExtractVariables(st.values, context)
+        createBoundStatement(st.name, ps, boundValues)
+      }
+      case None => throw new InterpreterException(s"The statement '${st.name}' can not be bound to values. " +
+          s"Are you sure you did prepare it with @prepare[${st.name}] ?")
+    }
+  }
+
+  def generateBatchStatement(batchType: BatchStatement.Type, options: CassandraQueryOptions, statements: List[Statement]): BatchStatement = {
+    logger.debug(s"""Generating batch statement of type '${batchType} for ${statements.mkString(",")}'""")
+    val batch = new BatchStatement(batchType)
+    statements.foreach(batch.add(_))
+    applyQueryOptions(options, batch)
+    batch
+  }
+
+  def maybeExtractVariables(statement: String, context: InterpreterContext): String = {
+
+    def extractVariableAndDefaultValue(statement: String, exp: String):String = {
+      exp match {
+        case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable,choices) => {
+          val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""").replaceAll("""\|""","""\\|""")
+          val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList
+          val paramOptions= listChoices.map(choice => new ParamOption(choice, choice))
+          val selected = context.getGui.select(variable, listChoices.head, paramOptions.toArray)
+          statement.replaceAll(escapedExp,selected.toString)
+        }
+        case SIMPLE_VARIABLE_DEFINITION_PATTERN(variable,defaultVal) => {
+          val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""")
+          val value = context.getGui.input(variable,defaultVal)
+          statement.replaceAll(escapedExp,value.toString)
+        }
+        case _ => throw new ParsingException(s"Invalid bound variable definition for '$exp' in '$statement'. It should be of form 'variable=defaultValue' or 'variable=value1|value2|...|valueN'")
+      }
+    }
+
+    VARIABLE_PATTERN.findAllIn(statement).foldLeft(statement)(extractVariableAndDefaultValue _)
+  }
+
+  def applyQueryOptions(options: CassandraQueryOptions, statement: Statement): Unit = {
+    options.consistency.foreach(statement.setConsistencyLevel(_))
+    options.serialConsistency.foreach(statement.setSerialConsistencyLevel(_))
+    options.timestamp.foreach(statement.setDefaultTimestamp(_))
+    options.retryPolicy.foreach {
+      case DefaultRetryPolicy => statement.setRetryPolicy(defaultRetryPolicy)
+      case DowngradingRetryPolicy => statement.setRetryPolicy(downgradingConsistencyRetryPolicy)
+      case FallThroughRetryPolicy => statement.setRetryPolicy(fallThroughRetryPolicy)
+      case LoggingDefaultRetryPolicy => statement.setRetryPolicy(loggingDefaultRetryPolicy)
+      case LoggingDowngradingRetryPolicy => statement.setRetryPolicy(loggingDownGradingRetryPolicy)
+      case LoggingFallThroughRetryPolicy => statement.setRetryPolicy(loggingFallThrougRetryPolicy)
+      case _ => throw new InterpreterException(s"""Unknown retry policy ${options.retryPolicy.getOrElse("???")}""")
+    }
+    options.fetchSize.foreach(statement.setFetchSize(_))
+  }
+
+  private def createBoundStatement(name: String, ps: PreparedStatement, rawBoundValues: String): BoundStatement = {
+    val dataTypes = ps.getVariables.toList
+      .map(cfDef => cfDef.getType)
+
+    val boundValuesAsText = parseBoundValues(name,rawBoundValues)
+
+    if(dataTypes.size != boundValuesAsText.size) throw new InterpreterException(s"Invalid @bind values for prepared statement '$name'. " +
+      s"Prepared parameters has ${dataTypes.size} variables whereas bound values have ${boundValuesAsText.size} parameters ...")
+
+    val convertedValues: List[AnyRef] = boundValuesAsText
+      .zip(dataTypes).map {
+        case (value, dataType) => {
+          if(value.trim == "null") {
+            null
+          } else {
+            dataType.getName match {
+            case (ASCII | TEXT | VARCHAR) => value.trim.replaceAll("(?<!')'","")
+            case (INT | VARINT) => value.trim.toInt
+            case (BIGINT | COUNTER) => value.trim.toLong
+            case BLOB => ByteBuffer.wrap(value.trim.getBytes)
+            case BOOLEAN => value.trim.toBoolean
+            case DECIMAL => BigDecimal(value.trim)
+            case DOUBLE => value.trim.toDouble
+            case FLOAT => value.trim.toFloat
+            case INET => InetAddress.getByName(value.trim)
+            case TIMESTAMP => parseDate(value.trim)
+            case (UUID | TIMEUUID) => java.util.UUID.fromString(value.trim)
+            case LIST => dataType.parse(boundValuesParser.parse(boundValuesParser.list, value).get)
+            case SET => dataType.parse(boundValuesParser.parse(boundValuesParser.set, value).get)
+            case MAP => dataType.parse(boundValuesParser.parse(boundValuesParser.map, value).get)
+            case UDT => dataType.parse(boundValuesParser.parse(boundValuesParser.udt, value).get)
+            case TUPLE => dataType.parse(boundValuesParser.parse(boundValuesParser.tuple, value).get)
+            case _ => throw new InterpreterException(s"Cannot parse data of type : ${dataType.toString}")
+          }
+        }
+      }
+    }.asInstanceOf[List[AnyRef]]
+
+    ps.bind(convertedValues.toArray: _*)
+  }
+
+  protected def parseBoundValues(psName: String, boundValues: String): List[String] = {
+    val result: BoundValuesParser#ParseResult[List[String]] = boundValuesParser.parseAll(boundValuesParser.values, boundValues)
+
+    result match {
+      case boundValuesParser.Success(list,_) => list
+      case _ => throw new InterpreterException(s"Cannot parse bound values for prepared statement '$psName' : $boundValues. Did you forget to wrap text with ' (simple quote) ?")
+    }
+  }
+
+  def parseDate(dateString: String): Date = {
+    dateString match {
+      case boundValuesParser.STANDARD_DATE_PATTERN(datePattern) => new SimpleDateFormat(STANDARD_DATE_FORMAT).parse(datePattern)
+      case boundValuesParser.ACCURATE_DATE_PATTERN(datePattern) => new SimpleDateFormat(ACCURATE_DATE_FORMAT).parse(datePattern)
+      case _ => throw new InterpreterException(s"Cannot parse date '$dateString'. " +
+        s"Accepted formats : $STANDARD_DATE_FORMAT OR $ACCURATE_DATE_FORMAT");
+    }
+  }
+
+  def parseException(ex: Exception): String = {
+    val os = new ByteArrayOutputStream()
+    val ps = new PrintStream(os)
+    ex.printStackTrace(ps)
+    os.toString("UTF-8")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
new file mode 100644
index 0000000..d64ad90
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import java.lang.Boolean._
+
+import com.datastax.driver.core.HostDistance._
+import com.datastax.driver.core.ProtocolOptions.Compression
+import com.datastax.driver.core._
+import com.datastax.driver.core.policies._
+import org.apache.commons.lang3.StringUtils._
+import org.apache.zeppelin.interpreter.Interpreter
+import org.apache.zeppelin.cassandra.CassandraInterpreter._
+import org.slf4j.LoggerFactory
+
+/**
+ * Utility class to extract and configure the Java driver
+ */
+class JavaDriverConfig {
+
+  val LOGGER = LoggerFactory.getLogger(classOf[JavaDriverConfig])
+
+  def getSocketOptions(intpr: Interpreter): SocketOptions = {
+    val socketOptions: SocketOptions = new SocketOptions
+    val socketOptionsInfo: StringBuilder = new StringBuilder("Socket options : \n\n")
+
+    val connectTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS).toInt
+    socketOptions.setConnectTimeoutMillis(connectTimeoutMillis)
+    socketOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS)
+      .append(" : ")
+      .append(connectTimeoutMillis).append("\n")
+
+    val readTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS).toInt
+    socketOptions.setReadTimeoutMillis(readTimeoutMillis)
+    socketOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS)
+      .append(" : ")
+      .append(readTimeoutMillis).append("\n")
+
+    val tcpNoDelay: Boolean = parseBoolean(intpr.getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY))
+    socketOptions.setTcpNoDelay(tcpNoDelay)
+    socketOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_SOCKET_TCP_NO_DELAY)
+      .append(" : ")
+      .append(tcpNoDelay)
+      .append("\n")
+
+    val keepAlive: String = intpr.getProperty(CASSANDRA_SOCKET_KEEP_ALIVE)
+    if (isNotBlank(keepAlive)) {
+      val keepAliveValue: Boolean = parseBoolean(keepAlive)
+      socketOptions.setKeepAlive(keepAliveValue)
+      socketOptionsInfo
+        .append("\t")
+        .append(CASSANDRA_SOCKET_KEEP_ALIVE)
+        .append(" : ")
+        .append(keepAliveValue).append("\n")
+    }
+
+    val receivedBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES)
+    if (isNotBlank(receivedBuffSize)) {
+      val receiveBufferSizeValue: Int = receivedBuffSize.toInt
+      socketOptions.setReceiveBufferSize(receiveBufferSizeValue)
+      socketOptionsInfo
+        .append("\t")
+        .append(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES)
+        .append(" : ")
+        .append(receiveBufferSizeValue)
+        .append("\n")
+    }
+
+    val sendBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES)
+    if (isNotBlank(sendBuffSize)) {
+      val sendBufferSizeValue: Int = sendBuffSize.toInt
+      socketOptions.setSendBufferSize(sendBufferSizeValue)
+      socketOptionsInfo
+        .append("\t")
+        .append(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES)
+        .append(" : ")
+        .append(sendBufferSizeValue)
+        .append("\n")
+    }
+
+    val reuseAddress: String = intpr.getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS)
+    if (isNotBlank(reuseAddress)) {
+      val reuseAddressValue: Boolean = parseBoolean(reuseAddress)
+      socketOptions.setReuseAddress(reuseAddressValue)
+      socketOptionsInfo
+        .append("\t")
+        .append(CASSANDRA_SOCKET_REUSE_ADDRESS)
+        .append(" : ")
+        .append(reuseAddressValue)
+        .append("\n")
+    }
+
+    val soLinger: String = intpr.getProperty(CASSANDRA_SOCKET_SO_LINGER)
+    if (isNotBlank(soLinger)) {
+      val soLingerValue: Int = soLinger.toInt
+      socketOptions.setSoLinger(soLingerValue)
+      socketOptionsInfo
+        .append("\t")
+        .append(CASSANDRA_SOCKET_SO_LINGER)
+        .append(" : ")
+        .append(soLingerValue)
+        .append("\n")
+    }
+
+    LOGGER.debug(socketOptionsInfo.append("\n").toString)
+
+    return socketOptions
+  }
+
+  def getQueryOptions(intpr: Interpreter): QueryOptions = {
+    val queryOptions: QueryOptions = new QueryOptions
+    val queryOptionsInfo: StringBuilder = new StringBuilder("Query options : \n\n")
+
+    val consistencyLevel: ConsistencyLevel = ConsistencyLevel.valueOf(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY))
+    queryOptions.setConsistencyLevel(consistencyLevel)
+    queryOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_QUERY_DEFAULT_CONSISTENCY)
+      .append(" : ")
+      .append(consistencyLevel)
+      .append("\n")
+
+    val serialConsistencyLevel: ConsistencyLevel = ConsistencyLevel.valueOf(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY))
+    queryOptions.setSerialConsistencyLevel(serialConsistencyLevel)
+    queryOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY)
+      .append(" : ")
+      .append(serialConsistencyLevel)
+      .append("\n")
+
+    val fetchSize: Int = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE).toInt
+    queryOptions.setFetchSize(fetchSize)
+    queryOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE)
+      .append(" : ")
+      .append(fetchSize)
+      .append("\n")
+
+    val defaultIdempotence: Boolean = parseBoolean(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE))
+    queryOptions.setDefaultIdempotence(defaultIdempotence)
+    queryOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE)
+      .append(" : ")
+      .append(defaultIdempotence)
+      .append("\n")
+
+    LOGGER.debug(queryOptionsInfo.append("\n").toString)
+
+    return queryOptions
+  }
+
+  def getProtocolVersion(intpr: Interpreter): ProtocolVersion = {
+    val protocolVersion: String = intpr.getProperty(CASSANDRA_PROTOCOL_VERSION)
+    LOGGER.debug("Protocol version : " + protocolVersion)
+
+    protocolVersion match {
+      case "1" =>
+        DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8"
+        DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2"
+        DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2"
+        DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
+        DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100"
+        DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "1"
+        DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "128"
+        DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "128"
+        return ProtocolVersion.V1
+      case "2" =>
+        DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8"
+        DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2"
+        DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2"
+        DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
+        DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100"
+        DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "1"
+        DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "128"
+        DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "128"
+        return ProtocolVersion.V2
+      case "3" =>
+        DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
+        DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
+        DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
+        DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
+        DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
+        DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
+        DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
+        DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
+        return ProtocolVersion.V3
+      case _ =>
+        DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
+        DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
+        DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
+        DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
+        DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
+        DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
+        DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
+        DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
+        return ProtocolVersion.NEWEST_SUPPORTED
+    }
+  }
+
+  def getPoolingOptions(intpr: Interpreter): PoolingOptions = {
+    val poolingOptions: PoolingOptions = new PoolingOptions
+    val poolingOptionsInfo: StringBuilder = new StringBuilder("Pooling options : \n\n")
+
+    val maxConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL).toInt
+    poolingOptions.setMaxConnectionsPerHost(LOCAL, maxConnPerHostLocal)
+    poolingOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL)
+      .append(" : ")
+      .append(maxConnPerHostLocal)
+      .append("\n")
+
+    val maxConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE).toInt
+    poolingOptions.setMaxConnectionsPerHost(REMOTE, maxConnPerHostRemote)
+    poolingOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE)
+      .append(" : ")
+      .append(maxConnPerHostRemote)
+      .append("\n")
+
+    val coreConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL).toInt
+    poolingOptions.setCoreConnectionsPerHost(LOCAL, coreConnPerHostLocal)
+    poolingOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL)
+      .append(" : ")
+      .append(coreConnPerHostLocal)
+      .append("\n")
+
+    val coreConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE).toInt
+    poolingOptions.setCoreConnectionsPerHost(REMOTE, coreConnPerHostRemote)
+    poolingOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE)
+      .append(" : ")
+      .append(coreConnPerHostRemote)
+      .append("\n")
+
+    val newConnThresholdLocal: Int = intpr.getProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL).toInt
+    poolingOptions.setNewConnectionThreshold(LOCAL, newConnThresholdLocal)
+    poolingOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL)
+      .append(" : ")
+      .append(newConnThresholdLocal)
+      .append("\n")
+
+    val newConnThresholdRemote: Int = intpr.getProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE).toInt
+    poolingOptions.setNewConnectionThreshold(REMOTE, newConnThresholdRemote)
+    poolingOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE)
+      .append(" : ")
+      .append(newConnThresholdRemote)
+      .append("\n")
+
+    val maxReqPerConnLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL).toInt
+    poolingOptions.setMaxRequestsPerConnection(LOCAL, maxReqPerConnLocal)
+    poolingOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL)
+      .append(" : ")
+      .append(maxReqPerConnLocal)
+      .append("\n")
+
+    val maxReqPerConnRemote: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE).toInt
+    poolingOptions.setMaxRequestsPerConnection(REMOTE, maxReqPerConnRemote)
+    poolingOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE)
+      .append(" : ")
+      .append(maxReqPerConnRemote)
+      .append("\n")
+
+    val heartbeatIntervalSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS).toInt
+    poolingOptions.setHeartbeatIntervalSeconds(heartbeatIntervalSeconds)
+    poolingOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS)
+      .append(" : ")
+      .append(heartbeatIntervalSeconds)
+      .append("\n")
+
+    val idleTimeoutSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS).toInt
+    poolingOptions.setIdleTimeoutSeconds(idleTimeoutSeconds)
+    poolingOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS)
+      .append(" : ")
+      .append(idleTimeoutSeconds)
+      .append("\n")
+
+    val poolTimeoutMillis: Int = intpr.getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS).toInt
+    poolingOptions.setPoolTimeoutMillis(poolTimeoutMillis)
+    poolingOptionsInfo
+      .append("\t")
+      .append(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS)
+      .append(" : ")
+      .append(poolTimeoutMillis)
+      .append("\n")
+
+    LOGGER.debug(poolingOptionsInfo.append("\n").toString)
+
+    return poolingOptions
+  }
+
+  def getCompressionProtocol(intpr: Interpreter): ProtocolOptions.Compression = {
+    var compression: ProtocolOptions.Compression = null
+    val compressionProtocol: String = intpr.getProperty(CASSANDRA_COMPRESSION_PROTOCOL)
+
+    LOGGER.debug("Compression protocol : " + compressionProtocol)
+
+    if (compressionProtocol == null) "NONE"
+    else compressionProtocol.toUpperCase match {
+      case "NONE" =>
+        compression = Compression.NONE
+      case "SNAPPY" =>
+        compression = Compression.SNAPPY
+      case "LZ4" =>
+        compression = Compression.LZ4
+      case _ =>
+        compression = Compression.NONE
+    }
+    return compression
+  }
+
+  def getLoadBalancingPolicy(intpr: Interpreter): LoadBalancingPolicy = {
+    val loadBalancingPolicy: String = intpr.getProperty(CASSANDRA_LOAD_BALANCING_POLICY)
+    LOGGER.debug("Load Balancing Policy : " + loadBalancingPolicy)
+
+    if (isBlank(loadBalancingPolicy) || (DEFAULT_POLICY == loadBalancingPolicy)) {
+      return Policies.defaultLoadBalancingPolicy
+    }
+    else {
+      try {
+        return (Class.forName(loadBalancingPolicy).asInstanceOf[Class[LoadBalancingPolicy]]).newInstance
+      }
+      catch {
+        case e: Any => {
+          e.printStackTrace
+          throw new RuntimeException("Cannot instantiate " + CASSANDRA_LOAD_BALANCING_POLICY + " = " + loadBalancingPolicy)
+        }
+      }
+    }
+  }
+
+  def getRetryPolicy(intpr: Interpreter): RetryPolicy = {
+    val retryPolicy: String = intpr.getProperty(CASSANDRA_RETRY_POLICY)
+    LOGGER.debug("Retry Policy : " + retryPolicy)
+
+    if (isBlank(retryPolicy) || (DEFAULT_POLICY == retryPolicy)) {
+      return Policies.defaultRetryPolicy
+    }
+    else {
+      try {
+        return (Class.forName(retryPolicy).asInstanceOf[Class[RetryPolicy]]).newInstance
+      }
+      catch {
+        case e: Any => {
+          e.printStackTrace
+          throw new RuntimeException("Cannot instantiate " + CASSANDRA_RETRY_POLICY + " = " + retryPolicy)
+        }
+      }
+    }
+  }
+
+  def getReconnectionPolicy(intpr: Interpreter): ReconnectionPolicy = {
+    val reconnectionPolicy: String = intpr.getProperty(CASSANDRA_RECONNECTION_POLICY)
+    LOGGER.debug("Reconnection Policy : " + reconnectionPolicy)
+
+    if (isBlank(reconnectionPolicy) || (DEFAULT_POLICY == reconnectionPolicy)) {
+      return Policies.defaultReconnectionPolicy
+    }
+    else {
+      try {
+        return (Class.forName(reconnectionPolicy).asInstanceOf[Class[ReconnectionPolicy]]).newInstance
+      }
+      catch {
+        case e: Any => {
+          e.printStackTrace
+          throw new RuntimeException("Cannot instantiate " + CASSANDRA_RECONNECTION_POLICY + " = " + reconnectionPolicy)
+        }
+      }
+    }
+  }
+
+  def getSpeculativeExecutionPolicy(intpr: Interpreter): SpeculativeExecutionPolicy = {
+    val specExecPolicy: String = intpr.getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY)
+    LOGGER.debug("Speculative Execution Policy : " + specExecPolicy)
+
+    if (isBlank(specExecPolicy) || (DEFAULT_POLICY == specExecPolicy)) {
+      return Policies.defaultSpeculativeExecutionPolicy
+    }
+    else {
+      try {
+        return (Class.forName(specExecPolicy).asInstanceOf[Class[SpeculativeExecutionPolicy]]).newInstance
+      }
+      catch {
+        case e: Any => {
+          e.printStackTrace
+          throw new RuntimeException("Cannot instantiate " + CASSANDRA_SPECULATIVE_EXECUTION_POLICY + " = " + specExecPolicy)
+        }
+      }
+    }
+  }  
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
new file mode 100644
index 0000000..66a0776
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import java.util.UUID
+
+import com.datastax.driver.core.utils.UUIDs
+import com.datastax.driver.core.{DataType, TableMetadata}
+
+import scala.util.parsing.json.JSONObject
+
+/**
+ * Define a hierarchy for CQL meta data
+ */
+object MetaDataHierarchy {
+  object OrderConverter {
+    def convert(clusteringOrder: TableMetadata.Order): ClusteringOrder = {
+      clusteringOrder match {
+        case TableMetadata.Order.ASC => ASC
+        case TableMetadata.Order.DESC => DESC
+      }
+    }
+  }
+
+
+  sealed trait ClusteringOrder
+  object ASC extends ClusteringOrder
+  object DESC extends ClusteringOrder
+
+  sealed trait ColumnType
+  object PartitionKey extends ColumnType
+  case class ClusteringColumn(order: ClusteringOrder) extends ColumnType
+  object StaticColumn extends ColumnType
+  object NormalColumn extends ColumnType
+  case class IndexDetails(name: String, info: String)
+  case class ColumnDetails(name: String, columnType: ColumnType, dataType: DataType, index: Option[IndexDetails])
+
+
+  case class ClusterDetails(name: String, partitioner: String)
+  case class ClusterContent(clusterName: String, clusterDetails: String, keyspaces: List[(UUID, String, String)])
+  case class AllTables(tables: Map[String,List[String]])
+  case class KeyspaceDetails(name: String, replication: Map[String,String], durableWrites: Boolean, asCQL: String, uniqueId: UUID = UUIDs.timeBased()) {
+    def getReplicationMap: String = {
+      JSONObject(replication).toString().replaceAll(""""""","'")
+    }
+  }
+  case class KeyspaceContent(keyspaceName: String, keyspaceDetails: String, tables: List[(UUID,String, String)], udts: List[(UUID, String, String)])
+  case class TableDetails(tableName: String, columns: List[ColumnDetails], asCQL: String, uniqueId: UUID = UUIDs.timeBased())
+  case class UDTDetails(typeName: String, columns: List[ColumnDetails], asCQL: String, uniqueId: UUID = UUIDs.timeBased())
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
new file mode 100644
index 0000000..0cd98ad
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import com.datastax.driver.core._
+import org.apache.zeppelin.cassandra.CassandraInterpreter._
+import org.apache.zeppelin.interpreter.InterpreterException
+import scala.util.parsing.combinator._
+import org.apache.zeppelin.cassandra.TextBlockHierarchy._
+
+object ParagraphParser {
+  val CONSISTENCY_LEVEL_PATTERN = ConsistencyLevel.values().toList
+    .map(_.name()).filter(!_.contains("SERIAL")).mkString("""^\s*@consistency\s*=\s*(""", "|" , """)\s*$""").r
+
+  val SERIAL_CONSISTENCY_LEVEL_PATTERN = ConsistencyLevel.values().toList
+    .map(_.name()).filter(_.contains("SERIAL")).mkString("""^\s*@serialConsistency\s*=\s*(""", "|", """)\s*$""").r
+  val TIMESTAMP_PATTERN = """^\s*@timestamp\s*=\s*([0-9]+)\s*$""".r
+
+  val RETRY_POLICIES_PATTERN = List(DEFAULT_POLICY,DOWNGRADING_CONSISTENCY_RETRY, FALLTHROUGH_RETRY,
+    LOGGING_DEFAULT_RETRY, LOGGING_DOWNGRADING_RETRY, LOGGING_FALLTHROUGH_RETRY)
+    .mkString("""^\s*@retryPolicy\s*=\s*(""", "|" , """)\s*$""").r
+  val FETCHSIZE_PATTERN = """^\s*@fetchSize\s*=\s*([0-9]+)\s*$""".r
+
+  val SIMPLE_STATEMENT_PATTERN = """([^;]+;)""".r
+  val PREPARE_STATEMENT_PATTERN = """^\s*@prepare\[([^]]+)\]\s*=\s*([^;]+)$""".r
+  val REMOVE_PREPARE_STATEMENT_PATTERN = """^\s*@remove_prepare\[([^]]+)\]\s*$""".r
+
+  val BIND_PATTERN = """^\s*@bind\[([^]]+)\](?:=([^;]+))?""".r
+  val BATCH_PATTERN = """^(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r
+
+  val GENERIC_STATEMENT_PREFIX =
+    """(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|UPDATE|
+      |DROP|GRANT|REVOKE|TRUNCATE|LIST|USE)\s+""".r
+
+  val VALID_IDENTIFIER = "[a-z][a-z0-9_]*"
+
+  val DESCRIBE_CLUSTER_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER;\s*$""".r
+  val DESCRIBE_KEYSPACES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES;\s*$""".r
+  val DESCRIBE_TABLES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+TABLES;\s*$""".r
+  val DESCRIBE_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+  val DESCRIBE_TABLE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+  val DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*(""" +
+                                                VALID_IDENTIFIER +
+                                                """)\.(""" +
+                                                VALID_IDENTIFIER +
+                                                """);\s*$""").r
+
+  val DESCRIBE_TYPE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+  val DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*(""" +
+                                                VALID_IDENTIFIER +
+                                                """)\.(""" +
+                                                VALID_IDENTIFIER +
+                                                """);\s*$""").r
+
+  val HELP_PATTERN = """^(?i)\s*HELP;\s*$""".r
+}
+
+class ParagraphParser extends RegexParsers{
+
+
+  import ParagraphParser._
+
+  def singleLineComment: Parser[Comment] = """\s*#.*""".r ^^ {case text => Comment(text.trim.replaceAll("#",""))}
+  def multiLineComment: Parser[Comment] = """(?s)/\*(.*)\*/""".r ^^ {case text => Comment(text.trim.replaceAll("""/\*""","").replaceAll("""\*/""",""))}
+
+  //Query parameters
+  def consistency: Parser[Consistency] = """\s*@consistency.+""".r ^^ {case x => extractConsistency(x.trim)}
+  def serialConsistency: Parser[SerialConsistency] = """\s*@serialConsistency.+""".r ^^ {case x => extractSerialConsistency(x.trim)}
+  def timestamp: Parser[Timestamp] = """\s*@timestamp.+""".r ^^ {case x => extractTimestamp(x.trim)}
+  def retryPolicy: Parser[RetryPolicy] = """\s*@retryPolicy.+""".r ^^ {case x => extractRetryPolicy(x.trim)}
+  def fetchSize: Parser[FetchSize] = """\s*@fetchSize.+""".r ^^ {case x => extractFetchSize(x.trim)}
+
+  //Statements
+  def genericStatement: Parser[SimpleStm] = s"""$GENERIC_STATEMENT_PREFIX[^;]+;""".r ^^ {case x => extractSimpleStatement(x.trim)}
+  def prepare: Parser[PrepareStm] = """\s*@prepare.+""".r ^^ {case x => extractPreparedStatement(x.trim)}
+  def removePrepare: Parser[RemovePrepareStm] = """\s*@remove_prepare.+""".r ^^ {case x => extractRemovePreparedStatement(x.trim)}
+  def bind: Parser[BoundStm] = """\s*@bind.+""".r ^^ {case x => extractBoundStatement(x.trim)}
+
+
+  //Meta data
+  private def describeCluster: Parser[DescribeClusterCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER.*""".r ^^ {extractDescribeClusterCmd(_)}
+  private def describeKeyspaces: Parser[DescribeKeyspacesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES.*""".r ^^ {extractDescribeKeyspacesCmd(_)}
+  private def describeTables: Parser[DescribeTablesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLES.*""".r ^^ {extractDescribeTablesCmd(_)}
+  private def describeKeyspace: Parser[DescribeKeyspaceCmd] = """\s*(?i)(?:DESCRIBE|DESC)\s+KEYSPACE\s+.+""".r ^^ {extractDescribeKeyspaceCmd(_)}
+  private def describeTable: Parser[DescribeTableCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s+.+""".r ^^ {extractDescribeTableCmd(_)}
+  private def describeType: Parser[DescribeUDTCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s+.*""".r ^^ {extractDescribeTypeCmd(_)}
+
+  //Help
+  private def helpCommand: Parser[HelpCmd] = """(?i)\s*HELP.*""".r ^^{extractHelpCmd(_)}
+
+  private def beginBatch: Parser[String] = """(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r
+  private def applyBatch: Parser[String] = """(?i)APPLY BATCH;""".r
+  private def insert: Parser[SimpleStm] = """(?i)INSERT [^;]+;""".r ^^{SimpleStm(_)}
+  private def update: Parser[SimpleStm] = """(?i)UPDATE [^;]+;""".r ^^{SimpleStm(_)}
+  private def delete: Parser[SimpleStm] = """(?i)DELETE [^;]+;""".r ^^{SimpleStm(_)}
+
+  private def mutationStatements: Parser[List[QueryStatement]] = rep(insert | update | delete | bind)
+
+  def batch: Parser[BatchStm] = beginBatch ~ mutationStatements ~ applyBatch ^^ {
+    case begin ~ cqls ~ end => BatchStm(extractBatchType(begin),cqls)}
+
+  def queries:Parser[List[AnyBlock]] = rep(singleLineComment | multiLineComment | consistency | serialConsistency |
+    timestamp | retryPolicy | fetchSize | removePrepare | prepare | bind | batch | describeCluster | describeKeyspaces |
+    describeTables | describeKeyspace | describeTable | describeType | helpCommand | genericStatement)
+
+  def extractConsistency(text: String): Consistency = {
+    text match {
+      case CONSISTENCY_LEVEL_PATTERN(consistency) => Consistency(ConsistencyLevel.valueOf(consistency))
+      case _ => throw new InterpreterException(s"Invalid syntax for @consistency. " +
+        s"It should comply to the pattern ${CONSISTENCY_LEVEL_PATTERN.toString}")
+    }
+  }
+
+  def extractSerialConsistency(text: String): SerialConsistency = {
+    text match {
+      case SERIAL_CONSISTENCY_LEVEL_PATTERN(consistency) => SerialConsistency(ConsistencyLevel.valueOf(consistency))
+      case _ => throw new InterpreterException(s"Invalid syntax for @serialConsistency. " +
+        s"It should comply to the pattern ${SERIAL_CONSISTENCY_LEVEL_PATTERN.toString}")
+    }
+  }
+
+  def extractTimestamp(text: String): Timestamp = {
+    text match {
+      case TIMESTAMP_PATTERN(timestamp) => Timestamp(timestamp.trim.toLong)
+      case _ => throw new InterpreterException(s"Invalid syntax for @timestamp. " +
+        s"It should comply to the pattern ${TIMESTAMP_PATTERN.toString}")
+    }
+  }
+
+  def extractRetryPolicy(text: String): RetryPolicy = {
+    text match {
+      case RETRY_POLICIES_PATTERN(retry) => retry.trim match {
+        case DEFAULT_POLICY => DefaultRetryPolicy
+        case DOWNGRADING_CONSISTENCY_RETRY => DowngradingRetryPolicy
+        case FALLTHROUGH_RETRY => FallThroughRetryPolicy
+        case LOGGING_DEFAULT_RETRY => LoggingDefaultRetryPolicy
+        case LOGGING_DOWNGRADING_RETRY => LoggingDowngradingRetryPolicy
+        case LOGGING_FALLTHROUGH_RETRY => LoggingFallThroughRetryPolicy
+      }
+      case _ => throw new InterpreterException(s"Invalid syntax for @retryPolicy. " +
+        s"It should comply to the pattern ${RETRY_POLICIES_PATTERN.toString}")
+    }
+  }
+
+  def extractFetchSize(text: String): FetchSize = {
+    text match {
+      case FETCHSIZE_PATTERN(fetchSize) => FetchSize(fetchSize.trim.toInt)
+      case _ => throw new InterpreterException(s"Invalid syntax for @fetchSize. " +
+        s"It should comply to the pattern ${FETCHSIZE_PATTERN.toString}")
+    }
+  }
+
+  def extractSimpleStatement(text: String): SimpleStm = {
+    text match {
+      case SIMPLE_STATEMENT_PATTERN(statement) => SimpleStm(statement)
+      case _ => throw new InterpreterException(s"Invalid statement '$text'. Did you forget to add ; (semi-colon) at the end of each CQL statement ?")
+    }
+  }
+
+  def extractPreparedStatement(text: String): PrepareStm = {
+    text match {
+      case PREPARE_STATEMENT_PATTERN(name,queryString) => PrepareStm(name.trim,queryString.trim)
+      case _ => throw new InterpreterException(s"Invalid syntax for @prepare. " +
+        s"It should comply to the pattern: @prepare[prepared_statement_name]=CQL Statement (without semi-colon)")
+    }
+  }
+
+  def extractRemovePreparedStatement(text: String): RemovePrepareStm= {
+    text match {
+      case REMOVE_PREPARE_STATEMENT_PATTERN(name) => RemovePrepareStm(name.trim)
+      case _ => throw new InterpreterException(s"Invalid syntax for @remove_prepare. " +
+        s"It should comply to the pattern: @remove_prepare[prepared_statement_name]")
+    }
+  }
+
+  def extractBoundStatement(text: String): BoundStm = {
+    text match {
+      case BIND_PATTERN(name,boundValues) => BoundStm(name.trim, Option(boundValues).map(_.trim).getOrElse(""))
+      case _ => throw new InterpreterException("Invalid syntax for @bind. It should comply to the pattern: " +
+        "@bind[prepared_statement_name]=10,'jdoe','John DOE',12345,'2015-07-32 12:04:23.234' " +
+        "OR @bind[prepared_statement_name] with no bound value. No semi-colon")
+    }
+  }
+
+  def extractBatchType(text: String): BatchStatement.Type = {
+    text match {
+      case BATCH_PATTERN(batchType) =>
+        val inferredType = Option(batchType).getOrElse("LOGGED")
+        BatchStatement.Type.valueOf(inferredType.toUpperCase)
+      case _ => throw new InterpreterException(s"Invalid syntax for BEGIN BATCH. " +
+        s"""It should comply to the pattern: ${BATCH_PATTERN.toString}""")
+    }
+  }
+
+  def extractDescribeClusterCmd(text: String): DescribeClusterCmd = {
+    text match {
+      case DESCRIBE_CLUSTER_PATTERN() => new DescribeClusterCmd
+      case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE CLUSTER. " +
+        s"""It should comply to the pattern: ${DESCRIBE_CLUSTER_PATTERN.toString}""")
+    }
+  }
+
+  def extractDescribeKeyspacesCmd(text: String): DescribeKeyspacesCmd = {
+    text match {
+        case DESCRIBE_KEYSPACES_PATTERN() => new DescribeKeyspacesCmd
+        case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE KEYSPACES. " +
+          s"""It should comply to the pattern: ${DESCRIBE_KEYSPACES_PATTERN.toString}""")
+      }
+  }
+
+  def extractDescribeTablesCmd(text: String): DescribeTablesCmd = {
+    text match {
+      case DESCRIBE_TABLES_PATTERN() => new DescribeTablesCmd
+      case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TABLES. " +
+        s"""It should comply to the pattern: ${DESCRIBE_TABLES_PATTERN.toString}""")
+    }
+  }
+
+  def extractDescribeKeyspaceCmd(text: String): DescribeKeyspaceCmd = {
+    text match {
+      case DESCRIBE_KEYSPACE_PATTERN(keyspace) => new DescribeKeyspaceCmd(keyspace)
+      case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE KEYSPACE. " +
+        s"""It should comply to the pattern: ${DESCRIBE_KEYSPACE_PATTERN.toString}""")
+    }
+  }
+
+  def extractDescribeTableCmd(text: String): DescribeTableCmd = {
+    text match {
+      case DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN(keyspace,table) => new DescribeTableCmd(Option(keyspace),table)
+      case DESCRIBE_TABLE_PATTERN(table) => new DescribeTableCmd(Option.empty,table)
+      case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TABLE. " +
+       s"""It should comply to the patterns: ${DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_TABLE_PATTERN.toString}""".stripMargin)
+    }
+  }
+
+  def extractDescribeTypeCmd(text: String): DescribeUDTCmd = {
+    text match {
+      case DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN(keyspace,table) => new DescribeUDTCmd(Option(keyspace),table)
+      case DESCRIBE_TYPE_PATTERN(table) => new DescribeUDTCmd(Option.empty,table)
+      case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TYPE. " +
+        s"""It should comply to the patterns: ${DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_TYPE_PATTERN.toString}""".stripMargin)
+    }
+  }
+
+  def extractHelpCmd(text: String): HelpCmd = {
+    text match {
+      case HELP_PATTERN() => new HelpCmd
+      case _ => throw new InterpreterException(s"Invalid syntax for HELP. " +
+        s"""It should comply to the patterns: ${HELP_PATTERN.toString}""".stripMargin)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
new file mode 100644
index 0000000..70b2ce2
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import com.datastax.driver.core._
+
+/**
+ * Define a Scala object hierarchy
+ * for input text parsing
+ */
+object TextBlockHierarchy {
+
+  sealed trait BlockType
+  object ParameterBlock extends BlockType
+  object StatementBlock extends BlockType
+  object DescribeBlock extends BlockType
+  object CommentBlock extends BlockType
+
+  abstract class AnyBlock(val blockType: BlockType) {
+    def get[U <: AnyBlock]: U = {
+      this.asInstanceOf[U]
+    }
+  }
+
+  case class Comment(text:String) extends AnyBlock(CommentBlock)
+
+  sealed trait ParameterType
+  object ConsistencyParam extends ParameterType
+  object SerialConsistencyParam extends ParameterType
+  object TimestampParam extends ParameterType
+  object RetryPolicyParam extends ParameterType
+  object FetchSizeParam extends ParameterType
+
+
+  abstract class QueryParameters(val paramType: ParameterType) extends AnyBlock(ParameterBlock) {
+    def getParam[U <: QueryParameters]: U = {
+      this.asInstanceOf[U]
+    }
+  }
+
+  case class Consistency(value: ConsistencyLevel) extends QueryParameters(ConsistencyParam)
+  
+  case class SerialConsistency(value: ConsistencyLevel) extends QueryParameters(SerialConsistencyParam)
+  
+  case class Timestamp(value: Long) extends QueryParameters(TimestampParam)
+  
+  case class FetchSize(value: Int) extends QueryParameters(FetchSizeParam)
+
+  abstract class RetryPolicy extends QueryParameters(RetryPolicyParam)
+
+  object DefaultRetryPolicy extends RetryPolicy
+  object DowngradingRetryPolicy extends RetryPolicy
+  object FallThroughRetryPolicy extends RetryPolicy
+  object LoggingDefaultRetryPolicy extends RetryPolicy
+  object LoggingDowngradingRetryPolicy extends RetryPolicy
+  object LoggingFallThroughRetryPolicy extends RetryPolicy
+  
+  sealed trait StatementType
+  object PrepareStatementType extends StatementType
+  object RemovePrepareStatementType extends StatementType
+  object BoundStatementType extends StatementType
+  object SimpleStatementType extends StatementType
+  object BatchStatementType extends StatementType
+  object DescribeClusterStatementType extends StatementType
+  object DescribeAllKeyspacesStatementType extends StatementType
+  object DescribeKeyspaceStatementType extends StatementType
+  object DescribeAllTablesStatementType extends StatementType
+  object DescribeTableStatementType extends StatementType
+  object DescribeTypeStatementType extends StatementType
+  object HelpStatementType extends StatementType
+
+  abstract class QueryStatement(val statementType: StatementType) extends AnyBlock(StatementBlock) {
+    def getStatement[U<: QueryStatement]: U = {
+      this.asInstanceOf[U]
+    }
+  }
+
+  case class SimpleStm(text:String) extends QueryStatement(SimpleStatementType)
+  
+  case class PrepareStm(name: String, query:String) extends QueryStatement(PrepareStatementType)
+  
+  case class RemovePrepareStm(name:String) extends QueryStatement(RemovePrepareStatementType)
+  
+  case class BoundStm(name: String, values:String) extends QueryStatement(BoundStatementType)
+  
+  case class BatchStm(batchType: BatchStatement.Type, statements: List[QueryStatement])
+    extends QueryStatement(BatchStatementType)
+
+  sealed trait DescribeCommandStatement {
+    val statement: String
+  }
+
+  class DescribeClusterCmd(override val statement: String = "DESCRIBE CLUSTER;")
+    extends QueryStatement(DescribeClusterStatementType) with DescribeCommandStatement
+
+  class DescribeKeyspacesCmd(override val statement: String = "DESCRIBE KEYSPACES;")
+    extends QueryStatement(DescribeAllKeyspacesStatementType) with DescribeCommandStatement
+
+  class DescribeTablesCmd(override val statement: String = "DESCRIBE TABLES;")
+    extends QueryStatement(DescribeAllTablesStatementType) with DescribeCommandStatement
+
+  case class DescribeKeyspaceCmd(keyspace: String) extends QueryStatement(DescribeKeyspaceStatementType)
+    with DescribeCommandStatement {
+    override val statement: String = s"DESCRIBE KEYSPACE $keyspace;"
+  }
+
+  case class DescribeTableCmd(keyspace:Option[String],table: String) extends QueryStatement(DescribeTableStatementType)
+    with DescribeCommandStatement {
+    override val statement: String = keyspace match {
+      case Some(ks) => s"DESCRIBE TABLE $ks.$table;"
+      case None => s"DESCRIBE TABLE $table;"
+    }
+  }
+
+  case class DescribeUDTCmd(keyspace:Option[String],udtName: String) extends QueryStatement(DescribeTypeStatementType)
+    with DescribeCommandStatement {
+    override val statement: String = keyspace match {
+      case Some(ks) => s"DESCRIBE TYPE $ks.$udtName;"
+      case None => s"DESCRIBE TYPE $udtName;"
+    }
+  }
+
+  class HelpCmd extends QueryStatement(HelpStatementType)
+
+}