You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/08/05 08:17:05 UTC
[4/6] incubator-zeppelin git commit: ZEPPELIN-179: Cassandra
Interpreter
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
new file mode 100644
index 0000000..c636de9
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import com.datastax.driver.core._
+import org.apache.zeppelin.cassandra.TextBlockHierarchy._
+import org.apache.zeppelin.interpreter.InterpreterException
+
+
+/**
+ * Enhance the Java driver session
+ * with special statements
+ * to describe schema
+ */
+class EnhancedSession(val session: Session) {
+
+ val clusterDisplay = DisplaySystem.ClusterDisplay
+ val keyspaceDisplay = DisplaySystem.KeyspaceDisplay
+ val tableDisplay = DisplaySystem.TableDisplay
+ val udtDisplay = DisplaySystem.UDTDisplay
+ val helpDisplay = DisplaySystem.HelpDisplay
+ private val noResultDisplay = DisplaySystem.NoResultDisplay
+
+
+ val HTML_MAGIC = "%html \n"
+
+ val displayNoResult: String = HTML_MAGIC + noResultDisplay.formatNoResult
+
+ def displayExecutionStatistics(query: String, execInfo: ExecutionInfo): String = {
+ HTML_MAGIC + noResultDisplay.noResultWithExecutionInfo(query, execInfo)
+ }
+
+ private def execute(describeCluster: DescribeClusterCmd): String = {
+ val metaData = session.getCluster.getMetadata
+ HTML_MAGIC + clusterDisplay.formatClusterOnly(describeCluster.statement, metaData)
+ }
+
+ private def execute(describeKeyspaces: DescribeKeyspacesCmd): String = {
+ val metaData = session.getCluster.getMetadata
+ HTML_MAGIC + clusterDisplay.formatClusterContent(describeKeyspaces.statement, metaData)
+ }
+
+ private def execute(describeTables: DescribeTablesCmd): String = {
+ val metadata: Metadata = session.getCluster.getMetadata
+ HTML_MAGIC + clusterDisplay.formatAllTables(describeTables.statement,metadata)
+ }
+
+ private def execute(describeKeyspace: DescribeKeyspaceCmd): String = {
+ val keyspace: String = describeKeyspace.keyspace
+ val metadata: KeyspaceMetadata = session.getCluster.getMetadata.getKeyspace(keyspace)
+ HTML_MAGIC + keyspaceDisplay.formatKeyspaceContent(describeKeyspace.statement, metadata)
+ }
+
+ private def execute(describeTable: DescribeTableCmd): String = {
+ val metaData = session.getCluster.getMetadata
+ val tableName: String = describeTable.table
+ val keyspace: String = describeTable.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
+
+ Option(metaData.getKeyspace(keyspace)).flatMap(ks => Option(ks.getTable(tableName))) match {
+ case Some(tableMeta) => HTML_MAGIC + tableDisplay.format(describeTable.statement, tableMeta, true)
+ case None => throw new InterpreterException(s"Cannot find table $keyspace.$tableName")
+ }
+ }
+
+ private def execute(describeUDT: DescribeUDTCmd): String = {
+ val metaData = session.getCluster.getMetadata
+ val keyspace: String = describeUDT.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system")
+ val udtName: String = describeUDT.udtName
+
+ Option(metaData.getKeyspace(keyspace)).flatMap(ks => Option(ks.getUserType(udtName))) match {
+ case Some(userType) => HTML_MAGIC + udtDisplay.format(describeUDT.statement, userType, true)
+ case None => throw new InterpreterException(s"Cannot find type $keyspace.$udtName")
+ }
+ }
+
+ private def execute(helpCmd: HelpCmd): String = {
+ HTML_MAGIC + helpDisplay.formatHelp()
+ }
+
+
+ def execute(st: Any): Any = {
+ st match {
+ case x:DescribeClusterCmd => execute(x)
+ case x:DescribeKeyspacesCmd => execute(x)
+ case x:DescribeTablesCmd => execute(x)
+ case x:DescribeKeyspaceCmd => execute(x)
+ case x:DescribeTableCmd => execute(x)
+ case x:DescribeUDTCmd => execute(x)
+ case x:HelpCmd => execute(x)
+ case x:Statement => session.execute(x)
+ case _ => throw new InterpreterException(s"Cannot execute statement '$st' of type ${st.getClass}")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
new file mode 100644
index 0000000..809bce7
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Date
+import java.util.concurrent.ConcurrentHashMap
+
+import com.datastax.driver.core.DataType.Name._
+import com.datastax.driver.core._
+import com.datastax.driver.core.exceptions.DriverException
+import com.datastax.driver.core.policies.{LoggingRetryPolicy, FallthroughRetryPolicy, DowngradingConsistencyRetryPolicy, Policies}
+import org.apache.zeppelin.cassandra.TextBlockHierarchy._
+import org.apache.zeppelin.display.Input.ParamOption
+import org.apache.zeppelin.interpreter.InterpreterResult.Code
+import org.apache.zeppelin.interpreter.{InterpreterException, InterpreterResult, InterpreterContext}
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+
+/**
+ * Value object to store runtime query parameters
+ * @param consistency consistency level
+ * @param serialConsistency serial consistency level
+ * @param timestamp timestamp
+ * @param retryPolicy retry policy
+ * @param fetchSize query fetch size
+ */
+case class CassandraQueryOptions(consistency: Option[ConsistencyLevel],
+ serialConsistency:Option[ConsistencyLevel],
+ timestamp: Option[Long],
+ retryPolicy: Option[RetryPolicy],
+ fetchSize: Option[Int])
+
+/**
+ * Singleton object to store constants
+ */
+object InterpreterLogic {
+
+ val CHOICES_SEPARATOR : String = """\|"""
+ val VARIABLE_PATTERN = """\{\{[^}]+\}\}""".r
+ val SIMPLE_VARIABLE_DEFINITION_PATTERN = """\{\{([^=]+)=([^=]+)\}\}""".r
+ val MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN = """\{\{([^=]+)=((?:[^=]+\|)+[^|]+)\}\}""".r
+
+ val STANDARD_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"
+ val ACCURATE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"
+
+ val defaultRetryPolicy = Policies.defaultRetryPolicy()
+ val downgradingConsistencyRetryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE
+ val fallThroughRetryPolicy = FallthroughRetryPolicy.INSTANCE
+ val loggingDefaultRetryPolicy = new LoggingRetryPolicy(defaultRetryPolicy)
+ val loggingDownGradingRetryPolicy = new LoggingRetryPolicy(downgradingConsistencyRetryPolicy)
+ val loggingFallThrougRetryPolicy = new LoggingRetryPolicy(fallThroughRetryPolicy)
+
+ val preparedStatements : mutable.Map[String,PreparedStatement] = new ConcurrentHashMap[String,PreparedStatement]().asScala
+
+ val logger = LoggerFactory.getLogger(classOf[InterpreterLogic])
+
+ val paragraphParser = new ParagraphParser
+ val boundValuesParser = new BoundValuesParser
+
+}
+
+/**
+ * Real class to implement the
+ * interpreting logic of CQL statements
+ * and parameters blocks
+ *
+ * @param session java driver session
+ */
+class InterpreterLogic(val session: Session) {
+
+ val enhancedSession: EnhancedSession = new EnhancedSession(session)
+
+ import InterpreterLogic._
+
+ def interpret(session:Session, stringStatements : String, context: InterpreterContext): InterpreterResult = {
+
+ logger.info(s"Executing CQL statements : \n\n$stringStatements\n")
+
+ try {
+ val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersionEnum
+
+ val queries:List[AnyBlock] = parseInput(stringStatements)
+
+ val queryOptions = extractQueryOptions(queries
+ .filter(_.blockType == ParameterBlock)
+ .map(_.get[QueryParameters]))
+
+ logger.info(s"Current Cassandra query options = $queryOptions")
+
+ val queryStatements = queries.filter(_.blockType == StatementBlock).map(_.get[QueryStatement])
+
+ //Remove prepared statements
+ queryStatements
+ .filter(_.statementType == RemovePrepareStatementType)
+ .map(_.getStatement[RemovePrepareStm])
+ .foreach(remove => {
+ logger.debug(s"Removing prepared statement '${remove.name}'")
+ preparedStatements.remove(remove.name)
+ })
+
+ //Update prepared statement maps
+ queryStatements
+ .filter(_.statementType == PrepareStatementType)
+ .map(_.getStatement[PrepareStm])
+ .foreach(statement => {
+ logger.debug(s"Get or prepare statement '${statement.name}' : ${statement.query}")
+ preparedStatements.getOrElseUpdate(statement.name,session.prepare(statement.query))
+ })
+
+ val statements: List[Any] = queryStatements
+ .filter(st => (st.statementType != PrepareStatementType) && (st.statementType != RemovePrepareStatementType))
+ .map{
+ case x:SimpleStm => generateSimpleStatement(x, queryOptions, context)
+ case x:BatchStm => {
+ val builtStatements: List[Statement] = x.statements.map {
+ case st:SimpleStm => generateSimpleStatement(st, queryOptions, context)
+ case st:BoundStm => generateBoundStatement(st, queryOptions, context)
+ case _ => throw new InterpreterException(s"Unknown statement type")
+ }
+ generateBatchStatement(x.batchType, queryOptions, builtStatements)
+ }
+ case x:BoundStm => generateBoundStatement(x, queryOptions, context)
+ case x:DescribeCommandStatement => x
+ case x:HelpCmd => x
+ case x => throw new InterpreterException(s"Unknown statement type : ${x}")
+ }
+
+ val results: List[(Any,Any)] = for (statement <- statements) yield (enhancedSession.execute(statement),statement)
+
+ if (results.nonEmpty) {
+ results.last match {
+ case(res: ResultSet, st: Statement) => buildResponseMessage((res, st), protocolVersion)
+ case(output: String, _) => new InterpreterResult(Code.SUCCESS, output)
+ case _ => throw new InterpreterException(s"Cannot parse result type : ${results.last}")
+ }
+
+ } else {
+ new InterpreterResult(Code.SUCCESS, enhancedSession.displayNoResult)
+ }
+
+ } catch {
+ case dex: DriverException => {
+ logger.error(dex.getMessage, dex)
+ new InterpreterResult(Code.ERROR, parseException(dex))
+ }
+ case pex:ParsingException => {
+ logger.error(pex.getMessage, pex)
+ new InterpreterResult(Code.ERROR, pex.getMessage)
+ }
+ case iex: InterpreterException => {
+ logger.error(iex.getMessage, iex)
+ new InterpreterResult(Code.ERROR, iex.getMessage)
+ }
+ case ex: java.lang.Exception => {
+ logger.error(ex.getMessage, ex)
+ new InterpreterResult(Code.ERROR, parseException(ex))
+ }
+ }
+ }
+
+ def buildResponseMessage(lastResultSet: (ResultSet,Statement), protocolVersion: ProtocolVersion): InterpreterResult = {
+ val output = new StringBuilder()
+ val rows: collection.mutable.ArrayBuffer[Row] = ArrayBuffer()
+
+ val iterator: util.Iterator[Row] = lastResultSet._1.iterator()
+ while (iterator.hasNext) {
+ rows.append(iterator.next())
+ }
+
+ val columnsDefinitions: List[(String, DataType)] = lastResultSet._1
+ .getColumnDefinitions
+ .asList
+ .toList // Java list -> Scala list
+ .map(definition => (definition.getName, definition.getType))
+
+
+ if (rows.nonEmpty) {
+ // Create table headers
+ output
+ .append("%table ")
+ .append(columnsDefinitions.map { case (columnName, _) => columnName }.mkString("\t")).append("\n")
+
+ // Deserialize Data
+ rows.foreach {
+ row => {
+ val data = columnsDefinitions.map {
+ case (name, dataType) => {
+ if (row.isNull(name)) null else dataType.deserialize(row.getBytesUnsafe(name), protocolVersion)
+ }
+ }
+ output.append(data.mkString("\t")).append("\n")
+ }
+ }
+ } else {
+ val lastQuery: String = lastResultSet._2.toString
+ val executionInfo: ExecutionInfo = lastResultSet._1.getExecutionInfo
+ output.append(enhancedSession.displayExecutionStatistics(lastQuery, executionInfo))
+ }
+
+ val result: String = output.toString()
+ logger.debug(s"CQL result : \n\n$result\n")
+ new InterpreterResult(Code.SUCCESS, result)
+ }
+
+ def parseInput(input:String): List[AnyBlock] = {
+ val parsingResult: ParagraphParser#ParseResult[List[AnyBlock]] = paragraphParser.parseAll(paragraphParser.queries, input)
+ parsingResult match {
+ case paragraphParser.Success(blocks,_) => blocks
+ case paragraphParser.Failure(msg,next) => {
+ throw new InterpreterException(s"Error parsing input:\n\t'$input'\nDid you forget to add ; (semi-colon) at the end of each CQL statement ?")
+ }
+ case paragraphParser.Error(msg,next) => {
+ throw new InterpreterException(s"Error parsing input:\n\t'$input'\nDid you forget to add ; (semi-colon) at the end of each CQL statement ?")
+ }
+ case _ => throw new InterpreterException(s"Error parsing input: $input")
+ }
+ }
+
+ def extractQueryOptions(parameters: List[QueryParameters]): CassandraQueryOptions = {
+
+ logger.debug(s"Extracting query options from $parameters")
+
+ val consistency: Option[ConsistencyLevel] = parameters
+ .filter(_.paramType == ConsistencyParam)
+ .map(_.getParam[Consistency])
+ .flatMap(x => Option(x.value))
+ .headOption
+
+
+ val serialConsistency: Option[ConsistencyLevel] = parameters
+ .filter(_.paramType == SerialConsistencyParam)
+ .map(_.getParam[SerialConsistency])
+ .flatMap(x => Option(x.value))
+ .headOption
+
+ val timestamp: Option[Long] = parameters
+ .filter(_.paramType == TimestampParam)
+ .map(_.getParam[Timestamp])
+ .flatMap(x => Option(x.value))
+ .headOption
+
+ val retryPolicy: Option[RetryPolicy] = parameters
+ .filter(_.paramType == RetryPolicyParam)
+ .map(_.getParam[RetryPolicy])
+ .headOption
+
+ val fetchSize: Option[Int] = parameters
+ .filter(_.paramType == FetchSizeParam)
+ .map(_.getParam[FetchSize])
+ .flatMap(x => Option(x.value))
+ .headOption
+
+ CassandraQueryOptions(consistency,serialConsistency, timestamp, retryPolicy, fetchSize)
+ }
+
+ def generateSimpleStatement(st: SimpleStm, options: CassandraQueryOptions,context: InterpreterContext): SimpleStatement = {
+ logger.debug(s"Generating simple statement : '${st.text}'")
+ val statement = new SimpleStatement(maybeExtractVariables(st.text, context))
+ applyQueryOptions(options, statement)
+ statement
+ }
+
+ def generateBoundStatement(st: BoundStm, options: CassandraQueryOptions,context: InterpreterContext): BoundStatement = {
+ logger.debug(s"Generating bound statement with name : '${st.name}' and bound values : ${st.values}")
+ preparedStatements.get(st.name) match {
+ case Some(ps) => {
+ val boundValues = maybeExtractVariables(st.values, context)
+ createBoundStatement(st.name, ps, boundValues)
+ }
+ case None => throw new InterpreterException(s"The statement '${st.name}' can not be bound to values. " +
+ s"Are you sure you did prepare it with @prepare[${st.name}] ?")
+ }
+ }
+
+ def generateBatchStatement(batchType: BatchStatement.Type, options: CassandraQueryOptions, statements: List[Statement]): BatchStatement = {
+ logger.debug(s"""Generating batch statement of type '${batchType} for ${statements.mkString(",")}'""")
+ val batch = new BatchStatement(batchType)
+ statements.foreach(batch.add(_))
+ applyQueryOptions(options, batch)
+ batch
+ }
+
+ def maybeExtractVariables(statement: String, context: InterpreterContext): String = {
+
+ def extractVariableAndDefaultValue(statement: String, exp: String):String = {
+ exp match {
+ case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable,choices) => {
+ val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""").replaceAll("""\|""","""\\|""")
+ val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList
+ val paramOptions= listChoices.map(choice => new ParamOption(choice, choice))
+ val selected = context.getGui.select(variable, listChoices.head, paramOptions.toArray)
+ statement.replaceAll(escapedExp,selected.toString)
+ }
+ case SIMPLE_VARIABLE_DEFINITION_PATTERN(variable,defaultVal) => {
+ val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""")
+ val value = context.getGui.input(variable,defaultVal)
+ statement.replaceAll(escapedExp,value.toString)
+ }
+ case _ => throw new ParsingException(s"Invalid bound variable definition for '$exp' in '$statement'. It should be of form 'variable=defaultValue' or 'variable=value1|value2|...|valueN'")
+ }
+ }
+
+ VARIABLE_PATTERN.findAllIn(statement).foldLeft(statement)(extractVariableAndDefaultValue _)
+ }
+
+ def applyQueryOptions(options: CassandraQueryOptions, statement: Statement): Unit = {
+ options.consistency.foreach(statement.setConsistencyLevel(_))
+ options.serialConsistency.foreach(statement.setSerialConsistencyLevel(_))
+ options.timestamp.foreach(statement.setDefaultTimestamp(_))
+ options.retryPolicy.foreach {
+ case DefaultRetryPolicy => statement.setRetryPolicy(defaultRetryPolicy)
+ case DowngradingRetryPolicy => statement.setRetryPolicy(downgradingConsistencyRetryPolicy)
+ case FallThroughRetryPolicy => statement.setRetryPolicy(fallThroughRetryPolicy)
+ case LoggingDefaultRetryPolicy => statement.setRetryPolicy(loggingDefaultRetryPolicy)
+ case LoggingDowngradingRetryPolicy => statement.setRetryPolicy(loggingDownGradingRetryPolicy)
+ case LoggingFallThroughRetryPolicy => statement.setRetryPolicy(loggingFallThrougRetryPolicy)
+ case _ => throw new InterpreterException(s"""Unknown retry policy ${options.retryPolicy.getOrElse("???")}""")
+ }
+ options.fetchSize.foreach(statement.setFetchSize(_))
+ }
+
+ private def createBoundStatement(name: String, ps: PreparedStatement, rawBoundValues: String): BoundStatement = {
+ val dataTypes = ps.getVariables.toList
+ .map(cfDef => cfDef.getType)
+
+ val boundValuesAsText = parseBoundValues(name,rawBoundValues)
+
+ if(dataTypes.size != boundValuesAsText.size) throw new InterpreterException(s"Invalid @bind values for prepared statement '$name'. " +
+ s"Prepared parameters has ${dataTypes.size} variables whereas bound values have ${boundValuesAsText.size} parameters ...")
+
+ val convertedValues: List[AnyRef] = boundValuesAsText
+ .zip(dataTypes).map {
+ case (value, dataType) => {
+ if(value.trim == "null") {
+ null
+ } else {
+ dataType.getName match {
+ case (ASCII | TEXT | VARCHAR) => value.trim.replaceAll("(?<!')'","")
+ case (INT | VARINT) => value.trim.toInt
+ case (BIGINT | COUNTER) => value.trim.toLong
+ case BLOB => ByteBuffer.wrap(value.trim.getBytes)
+ case BOOLEAN => value.trim.toBoolean
+ case DECIMAL => BigDecimal(value.trim)
+ case DOUBLE => value.trim.toDouble
+ case FLOAT => value.trim.toFloat
+ case INET => InetAddress.getByName(value.trim)
+ case TIMESTAMP => parseDate(value.trim)
+ case (UUID | TIMEUUID) => java.util.UUID.fromString(value.trim)
+ case LIST => dataType.parse(boundValuesParser.parse(boundValuesParser.list, value).get)
+ case SET => dataType.parse(boundValuesParser.parse(boundValuesParser.set, value).get)
+ case MAP => dataType.parse(boundValuesParser.parse(boundValuesParser.map, value).get)
+ case UDT => dataType.parse(boundValuesParser.parse(boundValuesParser.udt, value).get)
+ case TUPLE => dataType.parse(boundValuesParser.parse(boundValuesParser.tuple, value).get)
+ case _ => throw new InterpreterException(s"Cannot parse data of type : ${dataType.toString}")
+ }
+ }
+ }
+ }.asInstanceOf[List[AnyRef]]
+
+ ps.bind(convertedValues.toArray: _*)
+ }
+
+ protected def parseBoundValues(psName: String, boundValues: String): List[String] = {
+ val result: BoundValuesParser#ParseResult[List[String]] = boundValuesParser.parseAll(boundValuesParser.values, boundValues)
+
+ result match {
+ case boundValuesParser.Success(list,_) => list
+ case _ => throw new InterpreterException(s"Cannot parse bound values for prepared statement '$psName' : $boundValues. Did you forget to wrap text with ' (simple quote) ?")
+ }
+ }
+
+ def parseDate(dateString: String): Date = {
+ dateString match {
+ case boundValuesParser.STANDARD_DATE_PATTERN(datePattern) => new SimpleDateFormat(STANDARD_DATE_FORMAT).parse(datePattern)
+ case boundValuesParser.ACCURATE_DATE_PATTERN(datePattern) => new SimpleDateFormat(ACCURATE_DATE_FORMAT).parse(datePattern)
+ case _ => throw new InterpreterException(s"Cannot parse date '$dateString'. " +
+ s"Accepted formats : $STANDARD_DATE_FORMAT OR $ACCURATE_DATE_FORMAT");
+ }
+ }
+
+ def parseException(ex: Exception): String = {
+ val os = new ByteArrayOutputStream()
+ val ps = new PrintStream(os)
+ ex.printStackTrace(ps)
+ os.toString("UTF-8")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
new file mode 100644
index 0000000..d64ad90
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import java.lang.Boolean._
+
+import com.datastax.driver.core.HostDistance._
+import com.datastax.driver.core.ProtocolOptions.Compression
+import com.datastax.driver.core._
+import com.datastax.driver.core.policies._
+import org.apache.commons.lang3.StringUtils._
+import org.apache.zeppelin.interpreter.Interpreter
+import org.apache.zeppelin.cassandra.CassandraInterpreter._
+import org.slf4j.LoggerFactory
+
+/**
+ * Utility class to extract and configure the Java driver
+ */
+class JavaDriverConfig {
+
+ val LOGGER = LoggerFactory.getLogger(classOf[JavaDriverConfig])
+
+ def getSocketOptions(intpr: Interpreter): SocketOptions = {
+ val socketOptions: SocketOptions = new SocketOptions
+ val socketOptionsInfo: StringBuilder = new StringBuilder("Socket options : \n\n")
+
+ val connectTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS).toInt
+ socketOptions.setConnectTimeoutMillis(connectTimeoutMillis)
+ socketOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS)
+ .append(" : ")
+ .append(connectTimeoutMillis).append("\n")
+
+ val readTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS).toInt
+ socketOptions.setReadTimeoutMillis(readTimeoutMillis)
+ socketOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS)
+ .append(" : ")
+ .append(readTimeoutMillis).append("\n")
+
+ val tcpNoDelay: Boolean = parseBoolean(intpr.getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY))
+ socketOptions.setTcpNoDelay(tcpNoDelay)
+ socketOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_SOCKET_TCP_NO_DELAY)
+ .append(" : ")
+ .append(tcpNoDelay)
+ .append("\n")
+
+ val keepAlive: String = intpr.getProperty(CASSANDRA_SOCKET_KEEP_ALIVE)
+ if (isNotBlank(keepAlive)) {
+ val keepAliveValue: Boolean = parseBoolean(keepAlive)
+ socketOptions.setKeepAlive(keepAliveValue)
+ socketOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_SOCKET_KEEP_ALIVE)
+ .append(" : ")
+ .append(keepAliveValue).append("\n")
+ }
+
+ val receivedBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES)
+ if (isNotBlank(receivedBuffSize)) {
+ val receiveBufferSizeValue: Int = receivedBuffSize.toInt
+ socketOptions.setReceiveBufferSize(receiveBufferSizeValue)
+ socketOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES)
+ .append(" : ")
+ .append(receiveBufferSizeValue)
+ .append("\n")
+ }
+
+ val sendBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES)
+ if (isNotBlank(sendBuffSize)) {
+ val sendBufferSizeValue: Int = sendBuffSize.toInt
+ socketOptions.setSendBufferSize(sendBufferSizeValue)
+ socketOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES)
+ .append(" : ")
+ .append(sendBufferSizeValue)
+ .append("\n")
+ }
+
+ val reuseAddress: String = intpr.getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS)
+ if (isNotBlank(reuseAddress)) {
+ val reuseAddressValue: Boolean = parseBoolean(reuseAddress)
+ socketOptions.setReuseAddress(reuseAddressValue)
+ socketOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_SOCKET_REUSE_ADDRESS)
+ .append(" : ")
+ .append(reuseAddressValue)
+ .append("\n")
+ }
+
+ val soLinger: String = intpr.getProperty(CASSANDRA_SOCKET_SO_LINGER)
+ if (isNotBlank(soLinger)) {
+ val soLingerValue: Int = soLinger.toInt
+ socketOptions.setSoLinger(soLingerValue)
+ socketOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_SOCKET_SO_LINGER)
+ .append(" : ")
+ .append(soLingerValue)
+ .append("\n")
+ }
+
+ LOGGER.debug(socketOptionsInfo.append("\n").toString)
+
+ return socketOptions
+ }
+
+ def getQueryOptions(intpr: Interpreter): QueryOptions = {
+ val queryOptions: QueryOptions = new QueryOptions
+ val queryOptionsInfo: StringBuilder = new StringBuilder("Query options : \n\n")
+
+ val consistencyLevel: ConsistencyLevel = ConsistencyLevel.valueOf(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY))
+ queryOptions.setConsistencyLevel(consistencyLevel)
+ queryOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_QUERY_DEFAULT_CONSISTENCY)
+ .append(" : ")
+ .append(consistencyLevel)
+ .append("\n")
+
+ val serialConsistencyLevel: ConsistencyLevel = ConsistencyLevel.valueOf(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY))
+ queryOptions.setSerialConsistencyLevel(serialConsistencyLevel)
+ queryOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY)
+ .append(" : ")
+ .append(serialConsistencyLevel)
+ .append("\n")
+
+ val fetchSize: Int = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE).toInt
+ queryOptions.setFetchSize(fetchSize)
+ queryOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE)
+ .append(" : ")
+ .append(fetchSize)
+ .append("\n")
+
+ val defaultIdempotence: Boolean = parseBoolean(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE))
+ queryOptions.setDefaultIdempotence(defaultIdempotence)
+ queryOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE)
+ .append(" : ")
+ .append(defaultIdempotence)
+ .append("\n")
+
+ LOGGER.debug(queryOptionsInfo.append("\n").toString)
+
+ return queryOptions
+ }
+
+ def getProtocolVersion(intpr: Interpreter): ProtocolVersion = {
+ val protocolVersion: String = intpr.getProperty(CASSANDRA_PROTOCOL_VERSION)
+ LOGGER.debug("Protocol version : " + protocolVersion)
+
+ protocolVersion match {
+ case "1" =>
+ DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8"
+ DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2"
+ DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2"
+ DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
+ DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100"
+ DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "1"
+ DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "128"
+ DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "128"
+ return ProtocolVersion.V1
+ case "2" =>
+ DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8"
+ DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2"
+ DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2"
+ DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
+ DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100"
+ DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "1"
+ DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "128"
+ DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "128"
+ return ProtocolVersion.V2
+ case "3" =>
+ DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
+ DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
+ DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
+ DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
+ DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
+ DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
+ DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
+ DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
+ return ProtocolVersion.V3
+ case _ =>
+ DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
+ DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
+ DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
+ DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
+ DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
+ DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
+ DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
+ DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
+ return ProtocolVersion.NEWEST_SUPPORTED
+ }
+ }
+
+ def getPoolingOptions(intpr: Interpreter): PoolingOptions = {
+ val poolingOptions: PoolingOptions = new PoolingOptions
+ val poolingOptionsInfo: StringBuilder = new StringBuilder("Pooling options : \n\n")
+
+ val maxConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL).toInt
+ poolingOptions.setMaxConnectionsPerHost(LOCAL, maxConnPerHostLocal)
+ poolingOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL)
+ .append(" : ")
+ .append(maxConnPerHostLocal)
+ .append("\n")
+
+ val maxConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE).toInt
+ poolingOptions.setMaxConnectionsPerHost(REMOTE, maxConnPerHostRemote)
+ poolingOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE)
+ .append(" : ")
+ .append(maxConnPerHostRemote)
+ .append("\n")
+
+ val coreConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL).toInt
+ poolingOptions.setCoreConnectionsPerHost(LOCAL, coreConnPerHostLocal)
+ poolingOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL)
+ .append(" : ")
+ .append(coreConnPerHostLocal)
+ .append("\n")
+
+ val coreConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE).toInt
+ poolingOptions.setCoreConnectionsPerHost(REMOTE, coreConnPerHostRemote)
+ poolingOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE)
+ .append(" : ")
+ .append(coreConnPerHostRemote)
+ .append("\n")
+
+ val newConnThresholdLocal: Int = intpr.getProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL).toInt
+ poolingOptions.setNewConnectionThreshold(LOCAL, newConnThresholdLocal)
+ poolingOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL)
+ .append(" : ")
+ .append(newConnThresholdLocal)
+ .append("\n")
+
+ val newConnThresholdRemote: Int = intpr.getProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE).toInt
+ poolingOptions.setNewConnectionThreshold(REMOTE, newConnThresholdRemote)
+ poolingOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE)
+ .append(" : ")
+ .append(newConnThresholdRemote)
+ .append("\n")
+
+ val maxReqPerConnLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL).toInt
+ poolingOptions.setMaxRequestsPerConnection(LOCAL, maxReqPerConnLocal)
+ poolingOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL)
+ .append(" : ")
+ .append(maxReqPerConnLocal)
+ .append("\n")
+
+ val maxReqPerConnRemote: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE).toInt
+ poolingOptions.setMaxRequestsPerConnection(REMOTE, maxReqPerConnRemote)
+ poolingOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE)
+ .append(" : ")
+ .append(maxReqPerConnRemote)
+ .append("\n")
+
+ val heartbeatIntervalSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS).toInt
+ poolingOptions.setHeartbeatIntervalSeconds(heartbeatIntervalSeconds)
+ poolingOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS)
+ .append(" : ")
+ .append(heartbeatIntervalSeconds)
+ .append("\n")
+
+ val idleTimeoutSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS).toInt
+ poolingOptions.setIdleTimeoutSeconds(idleTimeoutSeconds)
+ poolingOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS)
+ .append(" : ")
+ .append(idleTimeoutSeconds)
+ .append("\n")
+
+ val poolTimeoutMillis: Int = intpr.getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS).toInt
+ poolingOptions.setPoolTimeoutMillis(poolTimeoutMillis)
+ poolingOptionsInfo
+ .append("\t")
+ .append(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS)
+ .append(" : ")
+ .append(poolTimeoutMillis)
+ .append("\n")
+
+ LOGGER.debug(poolingOptionsInfo.append("\n").toString)
+
+ return poolingOptions
+ }
+
+ def getCompressionProtocol(intpr: Interpreter): ProtocolOptions.Compression = {
+ var compression: ProtocolOptions.Compression = null
+ val compressionProtocol: String = intpr.getProperty(CASSANDRA_COMPRESSION_PROTOCOL)
+
+ LOGGER.debug("Compression protocol : " + compressionProtocol)
+
+ if (compressionProtocol == null) "NONE"
+ else compressionProtocol.toUpperCase match {
+ case "NONE" =>
+ compression = Compression.NONE
+ case "SNAPPY" =>
+ compression = Compression.SNAPPY
+ case "LZ4" =>
+ compression = Compression.LZ4
+ case _ =>
+ compression = Compression.NONE
+ }
+ return compression
+ }
+
+ def getLoadBalancingPolicy(intpr: Interpreter): LoadBalancingPolicy = {
+ val loadBalancingPolicy: String = intpr.getProperty(CASSANDRA_LOAD_BALANCING_POLICY)
+ LOGGER.debug("Load Balancing Policy : " + loadBalancingPolicy)
+
+ if (isBlank(loadBalancingPolicy) || (DEFAULT_POLICY == loadBalancingPolicy)) {
+ return Policies.defaultLoadBalancingPolicy
+ }
+ else {
+ try {
+ return (Class.forName(loadBalancingPolicy).asInstanceOf[Class[LoadBalancingPolicy]]).newInstance
+ }
+ catch {
+ case e: Any => {
+ e.printStackTrace
+ throw new RuntimeException("Cannot instantiate " + CASSANDRA_LOAD_BALANCING_POLICY + " = " + loadBalancingPolicy)
+ }
+ }
+ }
+ }
+
+ def getRetryPolicy(intpr: Interpreter): RetryPolicy = {
+ val retryPolicy: String = intpr.getProperty(CASSANDRA_RETRY_POLICY)
+ LOGGER.debug("Retry Policy : " + retryPolicy)
+
+ if (isBlank(retryPolicy) || (DEFAULT_POLICY == retryPolicy)) {
+ return Policies.defaultRetryPolicy
+ }
+ else {
+ try {
+ return (Class.forName(retryPolicy).asInstanceOf[Class[RetryPolicy]]).newInstance
+ }
+ catch {
+ case e: Any => {
+ e.printStackTrace
+ throw new RuntimeException("Cannot instantiate " + CASSANDRA_RETRY_POLICY + " = " + retryPolicy)
+ }
+ }
+ }
+ }
+
+ def getReconnectionPolicy(intpr: Interpreter): ReconnectionPolicy = {
+ val reconnectionPolicy: String = intpr.getProperty(CASSANDRA_RECONNECTION_POLICY)
+ LOGGER.debug("Reconnection Policy : " + reconnectionPolicy)
+
+ if (isBlank(reconnectionPolicy) || (DEFAULT_POLICY == reconnectionPolicy)) {
+ return Policies.defaultReconnectionPolicy
+ }
+ else {
+ try {
+ return (Class.forName(reconnectionPolicy).asInstanceOf[Class[ReconnectionPolicy]]).newInstance
+ }
+ catch {
+ case e: Any => {
+ e.printStackTrace
+ throw new RuntimeException("Cannot instantiate " + CASSANDRA_RECONNECTION_POLICY + " = " + reconnectionPolicy)
+ }
+ }
+ }
+ }
+
+ def getSpeculativeExecutionPolicy(intpr: Interpreter): SpeculativeExecutionPolicy = {
+ val specExecPolicy: String = intpr.getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY)
+ LOGGER.debug("Speculative Execution Policy : " + specExecPolicy)
+
+ if (isBlank(specExecPolicy) || (DEFAULT_POLICY == specExecPolicy)) {
+ return Policies.defaultSpeculativeExecutionPolicy
+ }
+ else {
+ try {
+ return (Class.forName(specExecPolicy).asInstanceOf[Class[SpeculativeExecutionPolicy]]).newInstance
+ }
+ catch {
+ case e: Any => {
+ e.printStackTrace
+ throw new RuntimeException("Cannot instantiate " + CASSANDRA_SPECULATIVE_EXECUTION_POLICY + " = " + specExecPolicy)
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
new file mode 100644
index 0000000..66a0776
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import java.util.UUID
+
+import com.datastax.driver.core.utils.UUIDs
+import com.datastax.driver.core.{DataType, TableMetadata}
+
+import scala.util.parsing.json.JSONObject
+
+/**
+ * Define a hierarchy for CQL meta data
+ */
+object MetaDataHierarchy {
+ object OrderConverter {
+ def convert(clusteringOrder: TableMetadata.Order): ClusteringOrder = {
+ clusteringOrder match {
+ case TableMetadata.Order.ASC => ASC
+ case TableMetadata.Order.DESC => DESC
+ }
+ }
+ }
+
+
+ sealed trait ClusteringOrder
+ object ASC extends ClusteringOrder
+ object DESC extends ClusteringOrder
+
+ sealed trait ColumnType
+ object PartitionKey extends ColumnType
+ case class ClusteringColumn(order: ClusteringOrder) extends ColumnType
+ object StaticColumn extends ColumnType
+ object NormalColumn extends ColumnType
+ case class IndexDetails(name: String, info: String)
+ case class ColumnDetails(name: String, columnType: ColumnType, dataType: DataType, index: Option[IndexDetails])
+
+
+ case class ClusterDetails(name: String, partitioner: String)
+ case class ClusterContent(clusterName: String, clusterDetails: String, keyspaces: List[(UUID, String, String)])
+ case class AllTables(tables: Map[String,List[String]])
+ case class KeyspaceDetails(name: String, replication: Map[String,String], durableWrites: Boolean, asCQL: String, uniqueId: UUID = UUIDs.timeBased()) {
+ def getReplicationMap: String = {
+ JSONObject(replication).toString().replaceAll(""""""","'")
+ }
+ }
+ case class KeyspaceContent(keyspaceName: String, keyspaceDetails: String, tables: List[(UUID,String, String)], udts: List[(UUID, String, String)])
+ case class TableDetails(tableName: String, columns: List[ColumnDetails], asCQL: String, uniqueId: UUID = UUIDs.timeBased())
+ case class UDTDetails(typeName: String, columns: List[ColumnDetails], asCQL: String, uniqueId: UUID = UUIDs.timeBased())
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
new file mode 100644
index 0000000..0cd98ad
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import com.datastax.driver.core._
+import org.apache.zeppelin.cassandra.CassandraInterpreter._
+import org.apache.zeppelin.interpreter.InterpreterException
+import scala.util.parsing.combinator._
+import org.apache.zeppelin.cassandra.TextBlockHierarchy._
+
+object ParagraphParser {
+ val CONSISTENCY_LEVEL_PATTERN = ConsistencyLevel.values().toList
+ .map(_.name()).filter(!_.contains("SERIAL")).mkString("""^\s*@consistency\s*=\s*(""", "|" , """)\s*$""").r
+
+ val SERIAL_CONSISTENCY_LEVEL_PATTERN = ConsistencyLevel.values().toList
+ .map(_.name()).filter(_.contains("SERIAL")).mkString("""^\s*@serialConsistency\s*=\s*(""", "|", """)\s*$""").r
+ val TIMESTAMP_PATTERN = """^\s*@timestamp\s*=\s*([0-9]+)\s*$""".r
+
+ val RETRY_POLICIES_PATTERN = List(DEFAULT_POLICY,DOWNGRADING_CONSISTENCY_RETRY, FALLTHROUGH_RETRY,
+ LOGGING_DEFAULT_RETRY, LOGGING_DOWNGRADING_RETRY, LOGGING_FALLTHROUGH_RETRY)
+ .mkString("""^\s*@retryPolicy\s*=\s*(""", "|" , """)\s*$""").r
+ val FETCHSIZE_PATTERN = """^\s*@fetchSize\s*=\s*([0-9]+)\s*$""".r
+
+ val SIMPLE_STATEMENT_PATTERN = """([^;]+;)""".r
+ val PREPARE_STATEMENT_PATTERN = """^\s*@prepare\[([^]]+)\]\s*=\s*([^;]+)$""".r
+ val REMOVE_PREPARE_STATEMENT_PATTERN = """^\s*@remove_prepare\[([^]]+)\]\s*$""".r
+
+ val BIND_PATTERN = """^\s*@bind\[([^]]+)\](?:=([^;]+))?""".r
+ val BATCH_PATTERN = """^(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r
+
+ val GENERIC_STATEMENT_PREFIX =
+ """(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|UPDATE|
+ |DROP|GRANT|REVOKE|TRUNCATE|LIST|USE)\s+""".r
+
+ val VALID_IDENTIFIER = "[a-z][a-z0-9_]*"
+
+ val DESCRIBE_CLUSTER_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER;\s*$""".r
+ val DESCRIBE_KEYSPACES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES;\s*$""".r
+ val DESCRIBE_TABLES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+TABLES;\s*$""".r
+ val DESCRIBE_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+ val DESCRIBE_TABLE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+ val DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*(""" +
+ VALID_IDENTIFIER +
+ """)\.(""" +
+ VALID_IDENTIFIER +
+ """);\s*$""").r
+
+ val DESCRIBE_TYPE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*("""+VALID_IDENTIFIER+""");\s*$""").r
+ val DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*(""" +
+ VALID_IDENTIFIER +
+ """)\.(""" +
+ VALID_IDENTIFIER +
+ """);\s*$""").r
+
+ val HELP_PATTERN = """^(?i)\s*HELP;\s*$""".r
+}
+
+class ParagraphParser extends RegexParsers{
+
+
+ import ParagraphParser._
+
+ def singleLineComment: Parser[Comment] = """\s*#.*""".r ^^ {case text => Comment(text.trim.replaceAll("#",""))}
+ def multiLineComment: Parser[Comment] = """(?s)/\*(.*)\*/""".r ^^ {case text => Comment(text.trim.replaceAll("""/\*""","").replaceAll("""\*/""",""))}
+
+ //Query parameters
+ def consistency: Parser[Consistency] = """\s*@consistency.+""".r ^^ {case x => extractConsistency(x.trim)}
+ def serialConsistency: Parser[SerialConsistency] = """\s*@serialConsistency.+""".r ^^ {case x => extractSerialConsistency(x.trim)}
+ def timestamp: Parser[Timestamp] = """\s*@timestamp.+""".r ^^ {case x => extractTimestamp(x.trim)}
+ def retryPolicy: Parser[RetryPolicy] = """\s*@retryPolicy.+""".r ^^ {case x => extractRetryPolicy(x.trim)}
+ def fetchSize: Parser[FetchSize] = """\s*@fetchSize.+""".r ^^ {case x => extractFetchSize(x.trim)}
+
+ //Statements
+ def genericStatement: Parser[SimpleStm] = s"""$GENERIC_STATEMENT_PREFIX[^;]+;""".r ^^ {case x => extractSimpleStatement(x.trim)}
+ def prepare: Parser[PrepareStm] = """\s*@prepare.+""".r ^^ {case x => extractPreparedStatement(x.trim)}
+ def removePrepare: Parser[RemovePrepareStm] = """\s*@remove_prepare.+""".r ^^ {case x => extractRemovePreparedStatement(x.trim)}
+ def bind: Parser[BoundStm] = """\s*@bind.+""".r ^^ {case x => extractBoundStatement(x.trim)}
+
+
+ //Meta data
+ private def describeCluster: Parser[DescribeClusterCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER.*""".r ^^ {extractDescribeClusterCmd(_)}
+ private def describeKeyspaces: Parser[DescribeKeyspacesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES.*""".r ^^ {extractDescribeKeyspacesCmd(_)}
+ private def describeTables: Parser[DescribeTablesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLES.*""".r ^^ {extractDescribeTablesCmd(_)}
+ private def describeKeyspace: Parser[DescribeKeyspaceCmd] = """\s*(?i)(?:DESCRIBE|DESC)\s+KEYSPACE\s+.+""".r ^^ {extractDescribeKeyspaceCmd(_)}
+ private def describeTable: Parser[DescribeTableCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s+.+""".r ^^ {extractDescribeTableCmd(_)}
+ private def describeType: Parser[DescribeUDTCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s+.*""".r ^^ {extractDescribeTypeCmd(_)}
+
+ //Help
+ private def helpCommand: Parser[HelpCmd] = """(?i)\s*HELP.*""".r ^^{extractHelpCmd(_)}
+
+ private def beginBatch: Parser[String] = """(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r
+ private def applyBatch: Parser[String] = """(?i)APPLY BATCH;""".r
+ private def insert: Parser[SimpleStm] = """(?i)INSERT [^;]+;""".r ^^{SimpleStm(_)}
+ private def update: Parser[SimpleStm] = """(?i)UPDATE [^;]+;""".r ^^{SimpleStm(_)}
+ private def delete: Parser[SimpleStm] = """(?i)DELETE [^;]+;""".r ^^{SimpleStm(_)}
+
+ private def mutationStatements: Parser[List[QueryStatement]] = rep(insert | update | delete | bind)
+
+ def batch: Parser[BatchStm] = beginBatch ~ mutationStatements ~ applyBatch ^^ {
+ case begin ~ cqls ~ end => BatchStm(extractBatchType(begin),cqls)}
+
+ def queries:Parser[List[AnyBlock]] = rep(singleLineComment | multiLineComment | consistency | serialConsistency |
+ timestamp | retryPolicy | fetchSize | removePrepare | prepare | bind | batch | describeCluster | describeKeyspaces |
+ describeTables | describeKeyspace | describeTable | describeType | helpCommand | genericStatement)
+
+ def extractConsistency(text: String): Consistency = {
+ text match {
+ case CONSISTENCY_LEVEL_PATTERN(consistency) => Consistency(ConsistencyLevel.valueOf(consistency))
+ case _ => throw new InterpreterException(s"Invalid syntax for @consistency. " +
+ s"It should comply to the pattern ${CONSISTENCY_LEVEL_PATTERN.toString}")
+ }
+ }
+
+ def extractSerialConsistency(text: String): SerialConsistency = {
+ text match {
+ case SERIAL_CONSISTENCY_LEVEL_PATTERN(consistency) => SerialConsistency(ConsistencyLevel.valueOf(consistency))
+ case _ => throw new InterpreterException(s"Invalid syntax for @serialConsistency. " +
+ s"It should comply to the pattern ${SERIAL_CONSISTENCY_LEVEL_PATTERN.toString}")
+ }
+ }
+
+ def extractTimestamp(text: String): Timestamp = {
+ text match {
+ case TIMESTAMP_PATTERN(timestamp) => Timestamp(timestamp.trim.toLong)
+ case _ => throw new InterpreterException(s"Invalid syntax for @timestamp. " +
+ s"It should comply to the pattern ${TIMESTAMP_PATTERN.toString}")
+ }
+ }
+
+ def extractRetryPolicy(text: String): RetryPolicy = {
+ text match {
+ case RETRY_POLICIES_PATTERN(retry) => retry.trim match {
+ case DEFAULT_POLICY => DefaultRetryPolicy
+ case DOWNGRADING_CONSISTENCY_RETRY => DowngradingRetryPolicy
+ case FALLTHROUGH_RETRY => FallThroughRetryPolicy
+ case LOGGING_DEFAULT_RETRY => LoggingDefaultRetryPolicy
+ case LOGGING_DOWNGRADING_RETRY => LoggingDowngradingRetryPolicy
+ case LOGGING_FALLTHROUGH_RETRY => LoggingFallThroughRetryPolicy
+ }
+ case _ => throw new InterpreterException(s"Invalid syntax for @retryPolicy. " +
+ s"It should comply to the pattern ${RETRY_POLICIES_PATTERN.toString}")
+ }
+ }
+
+ def extractFetchSize(text: String): FetchSize = {
+ text match {
+ case FETCHSIZE_PATTERN(fetchSize) => FetchSize(fetchSize.trim.toInt)
+ case _ => throw new InterpreterException(s"Invalid syntax for @fetchSize. " +
+ s"It should comply to the pattern ${FETCHSIZE_PATTERN.toString}")
+ }
+ }
+
+ def extractSimpleStatement(text: String): SimpleStm = {
+ text match {
+ case SIMPLE_STATEMENT_PATTERN(statement) => SimpleStm(statement)
+ case _ => throw new InterpreterException(s"Invalid statement '$text'. Did you forget to add ; (semi-colon) at the end of each CQL statement ?")
+ }
+ }
+
+ def extractPreparedStatement(text: String): PrepareStm = {
+ text match {
+ case PREPARE_STATEMENT_PATTERN(name,queryString) => PrepareStm(name.trim,queryString.trim)
+ case _ => throw new InterpreterException(s"Invalid syntax for @prepare. " +
+ s"It should comply to the pattern: @prepare[prepared_statement_name]=CQL Statement (without semi-colon)")
+ }
+ }
+
+ def extractRemovePreparedStatement(text: String): RemovePrepareStm= {
+ text match {
+ case REMOVE_PREPARE_STATEMENT_PATTERN(name) => RemovePrepareStm(name.trim)
+ case _ => throw new InterpreterException(s"Invalid syntax for @remove_prepare. " +
+ s"It should comply to the pattern: @remove_prepare[prepared_statement_name]")
+ }
+ }
+
+ def extractBoundStatement(text: String): BoundStm = {
+ text match {
+ case BIND_PATTERN(name,boundValues) => BoundStm(name.trim, Option(boundValues).map(_.trim).getOrElse(""))
+ case _ => throw new InterpreterException("Invalid syntax for @bind. It should comply to the pattern: " +
+ "@bind[prepared_statement_name]=10,'jdoe','John DOE',12345,'2015-07-32 12:04:23.234' " +
+ "OR @bind[prepared_statement_name] with no bound value. No semi-colon")
+ }
+ }
+
+ def extractBatchType(text: String): BatchStatement.Type = {
+ text match {
+ case BATCH_PATTERN(batchType) =>
+ val inferredType = Option(batchType).getOrElse("LOGGED")
+ BatchStatement.Type.valueOf(inferredType.toUpperCase)
+ case _ => throw new InterpreterException(s"Invalid syntax for BEGIN BATCH. " +
+ s"""It should comply to the pattern: ${BATCH_PATTERN.toString}""")
+ }
+ }
+
+ def extractDescribeClusterCmd(text: String): DescribeClusterCmd = {
+ text match {
+ case DESCRIBE_CLUSTER_PATTERN() => new DescribeClusterCmd
+ case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE CLUSTER. " +
+ s"""It should comply to the pattern: ${DESCRIBE_CLUSTER_PATTERN.toString}""")
+ }
+ }
+
+ def extractDescribeKeyspacesCmd(text: String): DescribeKeyspacesCmd = {
+ text match {
+ case DESCRIBE_KEYSPACES_PATTERN() => new DescribeKeyspacesCmd
+ case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE KEYSPACES. " +
+ s"""It should comply to the pattern: ${DESCRIBE_KEYSPACES_PATTERN.toString}""")
+ }
+ }
+
+ def extractDescribeTablesCmd(text: String): DescribeTablesCmd = {
+ text match {
+ case DESCRIBE_TABLES_PATTERN() => new DescribeTablesCmd
+ case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TABLES. " +
+ s"""It should comply to the pattern: ${DESCRIBE_TABLES_PATTERN.toString}""")
+ }
+ }
+
+ def extractDescribeKeyspaceCmd(text: String): DescribeKeyspaceCmd = {
+ text match {
+ case DESCRIBE_KEYSPACE_PATTERN(keyspace) => new DescribeKeyspaceCmd(keyspace)
+ case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE KEYSPACE. " +
+ s"""It should comply to the pattern: ${DESCRIBE_KEYSPACE_PATTERN.toString}""")
+ }
+ }
+
+ def extractDescribeTableCmd(text: String): DescribeTableCmd = {
+ text match {
+ case DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN(keyspace,table) => new DescribeTableCmd(Option(keyspace),table)
+ case DESCRIBE_TABLE_PATTERN(table) => new DescribeTableCmd(Option.empty,table)
+ case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TABLE. " +
+ s"""It should comply to the patterns: ${DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_TABLE_PATTERN.toString}""".stripMargin)
+ }
+ }
+
+ def extractDescribeTypeCmd(text: String): DescribeUDTCmd = {
+ text match {
+ case DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN(keyspace,table) => new DescribeUDTCmd(Option(keyspace),table)
+ case DESCRIBE_TYPE_PATTERN(table) => new DescribeUDTCmd(Option.empty,table)
+ case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TYPE. " +
+ s"""It should comply to the patterns: ${DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_TYPE_PATTERN.toString}""".stripMargin)
+ }
+ }
+
+ def extractHelpCmd(text: String): HelpCmd = {
+ text match {
+ case HELP_PATTERN() => new HelpCmd
+ case _ => throw new InterpreterException(s"Invalid syntax for HELP. " +
+ s"""It should comply to the patterns: ${HELP_PATTERN.toString}""".stripMargin)
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
new file mode 100644
index 0000000..70b2ce2
--- /dev/null
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cassandra
+
+import com.datastax.driver.core._
+
+/**
+ * Define a Scala object hierarchy
+ * for input text parsing
+ */
+object TextBlockHierarchy {
+
+ sealed trait BlockType
+ object ParameterBlock extends BlockType
+ object StatementBlock extends BlockType
+ object DescribeBlock extends BlockType
+ object CommentBlock extends BlockType
+
+ abstract class AnyBlock(val blockType: BlockType) {
+ def get[U <: AnyBlock]: U = {
+ this.asInstanceOf[U]
+ }
+ }
+
+ case class Comment(text:String) extends AnyBlock(CommentBlock)
+
+ sealed trait ParameterType
+ object ConsistencyParam extends ParameterType
+ object SerialConsistencyParam extends ParameterType
+ object TimestampParam extends ParameterType
+ object RetryPolicyParam extends ParameterType
+ object FetchSizeParam extends ParameterType
+
+
+ abstract class QueryParameters(val paramType: ParameterType) extends AnyBlock(ParameterBlock) {
+ def getParam[U <: QueryParameters]: U = {
+ this.asInstanceOf[U]
+ }
+ }
+
+ case class Consistency(value: ConsistencyLevel) extends QueryParameters(ConsistencyParam)
+
+ case class SerialConsistency(value: ConsistencyLevel) extends QueryParameters(SerialConsistencyParam)
+
+ case class Timestamp(value: Long) extends QueryParameters(TimestampParam)
+
+ case class FetchSize(value: Int) extends QueryParameters(FetchSizeParam)
+
+ abstract class RetryPolicy extends QueryParameters(RetryPolicyParam)
+
+ object DefaultRetryPolicy extends RetryPolicy
+ object DowngradingRetryPolicy extends RetryPolicy
+ object FallThroughRetryPolicy extends RetryPolicy
+ object LoggingDefaultRetryPolicy extends RetryPolicy
+ object LoggingDowngradingRetryPolicy extends RetryPolicy
+ object LoggingFallThroughRetryPolicy extends RetryPolicy
+
+ sealed trait StatementType
+ object PrepareStatementType extends StatementType
+ object RemovePrepareStatementType extends StatementType
+ object BoundStatementType extends StatementType
+ object SimpleStatementType extends StatementType
+ object BatchStatementType extends StatementType
+ object DescribeClusterStatementType extends StatementType
+ object DescribeAllKeyspacesStatementType extends StatementType
+ object DescribeKeyspaceStatementType extends StatementType
+ object DescribeAllTablesStatementType extends StatementType
+ object DescribeTableStatementType extends StatementType
+ object DescribeTypeStatementType extends StatementType
+ object HelpStatementType extends StatementType
+
+ abstract class QueryStatement(val statementType: StatementType) extends AnyBlock(StatementBlock) {
+ def getStatement[U<: QueryStatement]: U = {
+ this.asInstanceOf[U]
+ }
+ }
+
+ case class SimpleStm(text:String) extends QueryStatement(SimpleStatementType)
+
+ case class PrepareStm(name: String, query:String) extends QueryStatement(PrepareStatementType)
+
+ case class RemovePrepareStm(name:String) extends QueryStatement(RemovePrepareStatementType)
+
+ case class BoundStm(name: String, values:String) extends QueryStatement(BoundStatementType)
+
+ case class BatchStm(batchType: BatchStatement.Type, statements: List[QueryStatement])
+ extends QueryStatement(BatchStatementType)
+
+ sealed trait DescribeCommandStatement {
+ val statement: String
+ }
+
+ class DescribeClusterCmd(override val statement: String = "DESCRIBE CLUSTER;")
+ extends QueryStatement(DescribeClusterStatementType) with DescribeCommandStatement
+
+ class DescribeKeyspacesCmd(override val statement: String = "DESCRIBE KEYSPACES;")
+ extends QueryStatement(DescribeAllKeyspacesStatementType) with DescribeCommandStatement
+
+ class DescribeTablesCmd(override val statement: String = "DESCRIBE TABLES;")
+ extends QueryStatement(DescribeAllTablesStatementType) with DescribeCommandStatement
+
+ case class DescribeKeyspaceCmd(keyspace: String) extends QueryStatement(DescribeKeyspaceStatementType)
+ with DescribeCommandStatement {
+ override val statement: String = s"DESCRIBE KEYSPACE $keyspace;"
+ }
+
+ case class DescribeTableCmd(keyspace:Option[String],table: String) extends QueryStatement(DescribeTableStatementType)
+ with DescribeCommandStatement {
+ override val statement: String = keyspace match {
+ case Some(ks) => s"DESCRIBE TABLE $ks.$table;"
+ case None => s"DESCRIBE TABLE $table;"
+ }
+ }
+
+ case class DescribeUDTCmd(keyspace:Option[String],udtName: String) extends QueryStatement(DescribeTypeStatementType)
+ with DescribeCommandStatement {
+ override val statement: String = keyspace match {
+ case Some(ks) => s"DESCRIBE TYPE $ks.$udtName;"
+ case None => s"DESCRIBE TYPE $udtName;"
+ }
+ }
+
+ class HelpCmd extends QueryStatement(HelpStatementType)
+
+}