You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/06/11 01:31:58 UTC
[2/2] zeppelin git commit: [ZEPPELIN-901] Cassandra interpreter V3
[ZEPPELIN-901] Cassandra interpreter V3
### What is this PR for?
**Cassandra** interpreter V3:
* Update documentation
* Update interactive documentation
* Add support for binary protocol **V4**
* Implement new `requestTimeOut` runtime option
* Upgrade Java driver version to **3.0.1**
* Allow interpreter to add dynamic forms programmatically when using `FormType.SIMPLE`
* Allow dynamic form using default Zeppelin syntax
* Fixing typo on `FallThroughPolicy`
* Look for data in `AngularObjectRegistry` before creating dynamic form
* Add missing support for `ALTER` statements
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Test steps executed and confirmed working by the community
### What is the Jira issue?
**[ZEPPELIN-901]**
### How should this be tested?
* Download and install locally **[Cassandra 3.5]**
* Start **Cassandra**
* Clone this pull request locally with:
* `git fetch origin pull/950/head:CassandraInterpreterV3`
* `git checkout CassandraInterpreterV3`
* Build this version of **Zeppelin** with `mvn clean package -DskipTests`
* Start Zeppelin and update the property `cassandra.hosts` of the **Cassandra** interpreter (set it to _localhost_ or _127.0.0.1_ depending on your configuration)
* To test the `ALTER` statement, create a new paragraph with the following content:
```
%cassandra
CREATE KEYSPACE IF NOT EXISTS cassandra_v3 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
CREATE TABLE IF NOT EXISTS cassandra_v3.simple(key int PRIMARY KEY);
ALTER TABLE cassandra_v3.simple ADD value text;
DESCRIBE TABLE cassandra_v3.simple;
```
The result of this paragraph should be:
![image](https://cloud.githubusercontent.com/assets/1532977/15740044/6062bba4-28b3-11e6-818e-e257d0830041.png)
* To test the support for **Zeppelin** dynamic form syntax, create a paragraph with the following content:
```
%cassandra
INSERT INTO cassandra_v3.simple(key,value) VALUES(1, 'first');
INSERT INTO cassandra_v3.simple(key,value) VALUES(2, 'second');
INSERT INTO cassandra_v3.simple(key,value) VALUES(3, 'third');
SELECT * FROM cassandra_v3.simple WHERE key=${key=1};
```
The result of this paragraph should be:
![image](https://cloud.githubusercontent.com/assets/1532977/15740146/d7b696f8-28b3-11e6-80c5-428ae97558f6.png)
* To test the legacy syntax for dynamic form, create a new paragraph with the following content:
```
%cassandra
INSERT INTO cassandra_v3.simple(key,value) VALUES(1, 'first');
INSERT INTO cassandra_v3.simple(key,value) VALUES(2, 'second');
INSERT INTO cassandra_v3.simple(key,value) VALUES(3, 'third');
SELECT * FROM cassandra_v3.simple WHERE key={{key=1|2|3}};
```
The result of this paragraph should be:
![image](https://cloud.githubusercontent.com/assets/1532977/15740377/ebb018ea-28b4-11e6-9cde-b3aad50bbaef.png)
* To test the override of `AngularObject` upon dynamic form, create 2 new paragraphs with the following content
First paragraph content:
```
%angular
<form class="form-inline">
<div class="form-group">
<label for="keyId">Key: </label>
<input type="text" class="form-control" id="keyId" placeholder="key id ..." ng-model="key_id"></input>
</div>
<button type="submit" class="btn btn-primary" ng-click="z.angularBind('key_id',key_id,'PUT_HERE_SECOND_PARAGRAPH_ID'); z.runParagraph('PUT_HERE_SECOND_PARAGRAPH_ID')"> Bind</button>
</form>
```
Second paragraph content:
```
%cassandra
INSERT INTO cassandra_v3.simple(key,value) VALUES(1, 'first');
INSERT INTO cassandra_v3.simple(key,value) VALUES(2, 'second');
INSERT INTO cassandra_v3.simple(key,value) VALUES(3, 'third');
SELECT * FROM cassandra_v3.simple WHERE key={{key_id=1}};
```
Replace `PUT_HERE_SECOND_PARAGRAPH_ID` in the angular code of the first paragraph by the real paragraph id of the second paragraph. Execute the first paragraph, put a value into the input text (3) and click on **Bind**, you should see the following result:
![image](https://cloud.githubusercontent.com/assets/1532977/15740626/e9d9deba-28b5-11e6-9708-f040a8d58ccd.png)
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? --> **No**
* Is there breaking changes for older versions? --> **No**
* Does this needs documentation? --> **Yes**
[ZEPPELIN-901]: https://issues.apache.org/jira/browse/ZEPPELIN-901
[Cassandra 3.5]: http://cassandra.apache.org/download/
Author: DuyHai DOAN <do...@gmail.com>
Closes #950 from doanduyhai/CassandraInterpreterV3 and squashes the following commits:
abd42e3 [DuyHai DOAN] [ZEPPELIN-901] Update documentation after code review
8e39846 [DuyHai DOAN] [ZEPPELIN-901] Update documentation
e12400e [DuyHai DOAN] [ZEPPELIN-901] Update interactive documentation
b434cc1 [DuyHai DOAN] [ZEPPELIN-901] Add support for binary protocol V4
9d45bba [DuyHai DOAN] [ZEPPELIN-901] Implement new @requestTimeOut runtime option
e27741f [DuyHai DOAN] [ZEPPELIN-901] Upgrade Java driver version to 3.0.1
c85d928 [DuyHai DOAN] [ZEPPELIN-901] Allow interpreter to add dynamic forms programmatically when using FormType.SIMPLE
6a05749 [DuyHai DOAN] [ZEPPELIN-901] Allow dynamic form using default Zeppelin syntax
449e42c [DuyHai DOAN] [ZEPPELIN-901] Fixing typo on FallThroughPolicy
4914c69 [DuyHai DOAN] [ZEPPELIN-901] Look for data in AngularObjectRegistry before creating dynamic form
8e49a05 [DuyHai DOAN] [ZEPPELIN-901] Add missing support for ALTER statements
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/68b9b00f
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/68b9b00f
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/68b9b00f
Branch: refs/heads/master
Commit: 68b9b00fcd81b254ba56affeac5e0f2e2e2e509c
Parents: 19e8ed9
Author: DuyHai DOAN <do...@gmail.com>
Authored: Thu Jun 2 20:07:17 2016 +0200
Committer: Lee moon soo <mo...@apache.org>
Committed: Fri Jun 10 18:33:26 2016 -0700
----------------------------------------------------------------------
cassandra/pom.xml | 4 +-
.../cassandra/CassandraInterpreter.java | 6 +-
.../src/main/resources/scalate/helpMenu.ssp | 126 ++++++++++++------
.../zeppelin/cassandra/InterpreterLogic.scala | 54 ++++++--
.../zeppelin/cassandra/JavaDriverConfig.scala | 10 ++
.../zeppelin/cassandra/ParagraphParser.scala | 14 +-
.../zeppelin/cassandra/TextBlockHierarchy.scala | 3 +
.../cassandra/CassandraInterpreterTest.java | 21 ++-
.../cassandra/InterpreterLogicTest.java | 37 ++++++
cassandra/src/test/resources/scalate/Help.html | 2 +-
.../cassandra/ParagraphParserTest.scala | 17 +++
.../img/docs-img/cassandra-InterpreterName.png | Bin 26762 -> 0 bytes
.../docs-img/cassandra-NewInterpreterInList.png | Bin 6518 -> 0 bytes
.../cassandra-NewInterpreterInstance.png | Bin 23552 -> 0 bytes
docs/interpreter/cassandra.md | 128 +++++++++++--------
.../interpreter/remote/RemoteInterpreter.java | 19 ++-
16 files changed, 325 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/68b9b00f/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/cassandra/pom.xml b/cassandra/pom.xml
index d573193..46d2530 100644
--- a/cassandra/pom.xml
+++ b/cassandra/pom.xml
@@ -30,12 +30,12 @@
<artifactId>zeppelin-cassandra</artifactId>
<packaging>jar</packaging>
<version>0.6.0-SNAPSHOT</version>
- <name>Zeppelin: Cassandra</name>
+ <name>Zeppelin: Apache Cassandra interpreter</name>
<description>Zeppelin cassandra support</description>
<url>http://zeppelin.apache.org</url>
<properties>
- <cassandra.driver.version>3.0.0-rc1</cassandra.driver.version>
+ <cassandra.driver.version>3.0.1</cassandra.driver.version>
<snappy.version>1.0.5.4</snappy.version>
<lz4.version>1.3.0</lz4.version>
<scala.version>2.10.4</scala.version>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/68b9b00f/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
index cc4520d..ca77aba 100644
--- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
+++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
@@ -110,7 +110,7 @@ public class CassandraInterpreter extends Interpreter {
public static final String DEFAULT_PORT = "9042";
public static final String DEFAULT_CLUSTER = "Test Cluster";
public static final String DEFAULT_KEYSPACE = "system";
- public static final String DEFAULT_PROTOCOL_VERSION = "3";
+ public static final String DEFAULT_PROTOCOL_VERSION = "4";
public static final String DEFAULT_COMPRESSION = "NONE";
public static final String DEFAULT_CREDENTIAL = "none";
public static final String DEFAULT_POLICY = "DEFAULT";
@@ -159,7 +159,7 @@ public class CassandraInterpreter extends Interpreter {
"IP address). Default = localhost. Ex: '192.168.0.12,node2,node3'")
.add(CASSANDRA_PORT, DEFAULT_PORT, "Cassandra native port. Default = 9042")
.add(CASSANDRA_PROTOCOL_VERSION, DEFAULT_PROTOCOL_VERSION,
- "Cassandra protocol version. Default = 3")
+ "Cassandra protocol version. Default = 4")
.add(CASSANDRA_CLUSTER_NAME, DEFAULT_CLUSTER, "Cassandra cluster name. " +
"Default = 'Test Cluster'")
.add(CASSANDRA_KEYSPACE_NAME, DEFAULT_KEYSPACE, "Cassandra keyspace name. " +
@@ -311,7 +311,7 @@ public class CassandraInterpreter extends Interpreter {
@Override
public FormType getFormType() {
- return FormType.NATIVE;
+ return FormType.SIMPLE;
}
@Override
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/68b9b00f/cassandra/src/main/resources/scalate/helpMenu.ssp
----------------------------------------------------------------------
diff --git a/cassandra/src/main/resources/scalate/helpMenu.ssp b/cassandra/src/main/resources/scalate/helpMenu.ssp
index 26ff117..c61a68f 100644
--- a/cassandra/src/main/resources/scalate/helpMenu.ssp
+++ b/cassandra/src/main/resources/scalate/helpMenu.ssp
@@ -103,12 +103,12 @@
<ul class="dropdown-menu">
<li>
<a role="button">
- <span class="text-info">Version <strong>2.0</strong></span>
+ <span class="text-info">Version <strong>3.0</strong></span>
</a>
</li>
<li>
<a role="button">
- <span class="text-info">Java Driver Version <strong>3.0.0-rc1</strong></span>
+ <span class="text-info">Java Driver Version <strong>3.0.1</strong></span>
</a>
</li>
<li>
@@ -215,6 +215,14 @@
<tr><th>Cassandra version</th><th>Documentation</th></tr>
</thead>
<tbody>
+ <tr>
+ <td><strong>3.x</strong></td>
+ <td>
+ <a href="http://docs.datastax.com/en/cql/3.3/cql/cqlIntro.html" target="_blank">
+ http://docs.datastax.com/en/cql/3.3/cql/cqlIntro.html
+ </a>
+ </td>
+ </tr>
<tr>
<td><strong>2.2</strong></td>
<td>
@@ -464,6 +472,11 @@
<td><strong>@fetchSize=<em>int value</em></strong></td>
<td>Apply the given fetch size to all queries in the paragraph</td>
</tr>
+ <tr>
+ <td>Request Timeout</td>
+ <td><strong>@requestTimeOut=<em>int value</em></strong></td>
+ <td>Apply the given request timeout <strong>in millisecs</strong> to all queries in the paragraph</td>
+ </tr>
</tbody>
</table>
<br/>
@@ -506,6 +519,10 @@
<td>Fetch Size</td>
<td>Any integer value</td>
</tr>
+ <tr>
+ <td>Request Timeout</td>
+ <td>Any integer value</td>
+ </tr>
</tbody>
</table>
<br/>
@@ -526,19 +543,19 @@
);
TRUNCATE spark_demo.ts;
- # Timestamp in the past
+ // Timestamp in the past
@timestamp=10
- # Force timestamp directly in the first insert
+ // Force timestamp directly in the first insert
INSERT INTO spark_demo.ts(key,value) VALUES(1,'first insert') USING TIMESTAMP 100;
- # Select some data to make the clock turn
+ // Select some data to make the clock turn
SELECT * FROM spark_demo.albums LIMIT 100;
- # Now insert using the timestamp parameter set at the beginning(10)
+ // Now insert using the timestamp parameter set at the beginning(10)
INSERT INTO spark_demo.ts(key,value) VALUES(1,'second insert');
- # Check for the result. You should see 'first insert'
+ // Check for the result. You should see 'first insert'
SELECT value FROM spark_demo.ts WHERE key=1;
</pre>
</div>
@@ -599,13 +616,13 @@
<div class="col-md-10 col-md-offset-1">
<pre>
- @prepare[statement_name]=...
+ @prepare[statement-name]=...
- @bind[statement_name]=\u2019text\u2019, 1223, \u20192015-07-30 12:00:01\u2019, null, true, [\u2018list_item1\u2019, \u2019list_item2\u2019]
+ @bind[statement-name]=\u2019text\u2019, 1223, \u20192015-07-30 12:00:01\u2019, null, true, [\u2018list_item1\u2019, \u2019list_item2\u2019]
- @bind[statement_name_with_no_bound_value]
+ @bind[statement-name-with-no-bound-value]
- @remove_prepare[statement_name]
+ @remove_prepare[statement-name]
</pre>
</div>
@@ -615,10 +632,10 @@
<h3>II @prepare</h3>
<br/>
<p>
- You can use the syntax "<strong>@prepare[statement_name]=SELECT ...</strong>" to create a prepared statement.
- The <em>statement_name</em> is mandatory because the interpreter prepares the given statement with the
+ You can use the syntax "<strong>@prepare[statement-name]=SELECT ...</strong>" to create a prepared statement.
+ The <em>statement-name</em> is mandatory because the interpreter prepares the given statement with the
Java driver and saves the generated prepared statement in an internal map, using the provided
- <em>statement_name</em> as search key.
+ <em>statement-name</em> as search key.
<br/><br/>
<div class="alert alert-info">
Please note that this internal prepared statement map is shared with <strong>all notebooks</strong>
@@ -626,7 +643,7 @@
</div>
<br/>
<div class="alert alert-warning">
- If the interpreter encounters many @prepare for the <strong>same statement_name</strong> (key),
+ If the interpreter encounters many @prepare for the <strong>same statement-name</strong> (key),
only the <strong>first</strong> statement will be taken into account.
</div>
<br/>
@@ -645,7 +662,7 @@
<br/>
For the above example, the prepared statement is <strong>"SELECT * FROM spark_demo.albums LIMIT ?"</strong>.
- <em>"SELECT * FROM spark_demo.artists LIMIT ?"</em> is ignored because an entry already exists in the
+ <strong>"SELECT * FROM spark_demo.artists LIMIT ?"</strong> is ignored because an entry already exists in the
prepared statements map with the key <strong>select</strong>.
<br/><br/>
In the context of Zeppelin, a notebook can be scheduled to be executed at regular interval,
@@ -712,7 +729,7 @@
<h3>IV @remove_prepare</h3>
<br/>
<p>
- To avoid for a prepared statement to stay forever in the prepared statement map, you can use the <strong>@remove_prepare[statement_name]</strong> syntax
+ To avoid for a prepared statement to stay forever in the prepared statement map, you can use the <strong>@remove_prepare[statement-name]</strong> syntax
to remove it. Removing a non-existing prepared statement yields no error.
</p>
</div>
@@ -735,16 +752,22 @@
<div class="panel panel-default">
<div class="panel-body">
<p>
- Instead of hard-coding your CQL queries, it is possible to use the mustache syntax (<strong>{{ }}</strong>)
- to inject simple value or multiple choices forms.
+ Instead of hard-coding your CQL queries, it is possible to use <strong>
+ <a href="http://zeppelin.apache.org/docs/0.6.0-SNAPSHOT/manual/dynamicform.html" target="_blank">Zeppelin dynamic form</a>
+ </strong> syntax to inject simple value or multiple choices forms.
+
+ The legacy mustache syntax ( <strong>{{ }}</strong> ) to bind input text and select form is still supported but is deprecated and will be removed in future releases.
+
<br/><br/>
+ <h6> -- Legacy syntax -- </h6>
The syntax for simple parameter is: <strong>{{input_Label=default value}}</strong>.
The default value is mandatory because the first time the paragraph is executed,
we launch the CQL query before rendering the form so at least one value should be provided.
<br/><br/>
The syntax for multiple choices parameter is: <strong>{{input_Label=value1 | value2 | \u2026 | valueN }}</strong>.
By default the first choice is used for CQL query the first time the paragraph is executed.
+ <h6> -- End legacy syntax -- </h6>
<br/><br/>
Example:
<br/>
@@ -755,8 +778,8 @@
#Secondary index on performer style
SELECT name, country, performer
FROM spark_demo.performers
- WHERE name='{{performer=Sheryl Crow|Doof|Fanfarlo|Los Paranoia}}'
- AND styles CONTAINS '{{style=Rock}}';
+ WHERE name='\${performer=Sheryl Crow|Doof|Fanfarlo|Los Paranoia}'
+ AND styles CONTAINS '\${style=Rock}';
</pre>
</div>
@@ -766,13 +789,13 @@
In the above example, the first CQL query will be executed for <em>performer='Sheryl Crow'</em>
AND <em>style='Rock'</em>. For subsequent queries, you can change the value directly using the form.
Please note that we enclosed the {{ }} block between simple quotes (') because Cassandra expects a String here.
- We could have also use the <strong>{{style='Rock'}}</strong> syntax but this time, the value
+ We could have also use the <strong>\${style='Rock'}</strong> syntax but this time, the value
displayed on the form is <em>'Rock'</em> and not <em>Rock</em>.
<br/><br/>
<div class="alert alert-info">
It is also possible to use dynamic forms for <strong>prepared statements</strong>: <br/>
- <strong>@bind[select]=='{{performer=Sheryl Crow|Doof|Fanfarlo|Los Paranoia}}', '{{style=Rock}}'</strong>
+ <strong>@bind[select]=='\${performer=Sheryl Crow|Doof|Fanfarlo|Los Paranoia}', '\${style=Rock}'</strong>
</div>
</pre>
</p>
@@ -892,7 +915,7 @@
</tr>
<tr>
<td>cassandra.protocol.version</td>
- <td><strong>3</strong></td>
+ <td><strong>4</strong></td>
</tr>
<tr>
<td>cassandra.query.default.consistency</td>
@@ -947,20 +970,35 @@
<div id="${sharedStatesId}" class="panel-collapse collapse" role="tabpanel">
<div class="panel-body">
It is possible to execute many paragraphs in parallel. However, at the back-end side, we\u2019re still using synchronous queries. <em>Asynchronous execution</em> is only possible when it is possible to return a <strong>Future</strong> value in the <strong>InterpreterResult</strong>. It may be an interesting proposal for the <strong>Zeppelin</strong> project.
- <br/>
- Another caveat is that the same <strong>com.datastax.driver.core.Session</strong> object is used for <strong>all</strong> notebooks and paragraphs. Consequently, if you use the <em>USE keyspace name;</em> statement to log into a keyspace, it will change the keyspace for <strong>all current users</strong> of the Cassandra interpreter because we only create 1 <strong>com.datastax.driver.core.Session</strong> object per instance of <strong>Cassandra</strong> interpreter.
- <br/>
- The same remark does apply to the <strong>prepared statement hash map</strong>, it is shared by <strong>all users</strong> using the same instance of <strong>Cassandra</strong> interpreter.
+ <br/><br/>
+ Recently, <strong>Zeppelin</strong> allows you to choose the level of isolation for your interpreters (see
+ <strong><a href="http://zeppelin.apache.org/docs/0.6.0-SNAPSHOT/manual/interpreters.html" target="_blank">Interpreter Binding Mode</a></strong> ).
+ <br/><br/>
+ Long story short, you have 3 available bindings:
+
+ <ul>
+ <li><strong>shared</strong> : <em>same JVM</em> and <em>same Interpreter instance</em> for all notes</li>
+ <li><strong>scoped</strong> : <em>same JVM</em> but <em>different Interpreter instances</em>, one for each note</li>
+ <li><strong>isolated</strong> : <em>different JVM</em> running a <em>single Interpreter instance</em>, one JVM for each note</li>
+ </ul>
+
<br/>
- Until <strong>Zeppelin</strong> offers a real multi-users separation, there is a work-around to segregate user environment and states: <em>create different Cassandra interpreter instances</em>
- <br/>
- <ol>
- <li>First go to the <strong>Interpreter</strong> menu and click on the <strong>Create</strong> button</li>
- <li>In the interpreter creation form, put <strong>cass-instance2</strong> as <strong>Name</strong> and select the <strong>cassandra</strong> in the interpreter drop-down list</li>
- <li>Click on <strong>Save</strong> to create the new interpreter instance. Now you should be able to see it in the interpreter list</li>
- <li>Go back to your notebook and click on the <strong>Gear</strong> icon to configure interpreter bindings. You should be able to see and select the <strong>cass-instance2</strong> interpreter instance in the available interpreter list instead of the standard <strong>cassandra</strong> instance</li>
- </ol>
- </div>
+ Using the <strong>shared</strong> binding, the same <code>com.datastax.driver.core.Session</code> object is used for all notes and paragraphs.
+ Consequently, if you use the <strong>USE keyspace name;</strong> statement to log into a keyspace,
+ it will change the keyspace for all current users of the Cassandra interpreter because we only create 1
+ <code>com.datastax.driver.core.Session</code> object per instance of Cassandra interpreter.
+
+ <br/><br/>
+ The same remark does apply to the <strong>prepared statement hash map</strong>, it is shared by all users using the same instance of Cassandra interpreter.
+ <br/><br/>
+ When using <strong>scoped</strong> binding, in the <em>same JVM</em> <strong>Zeppelin</strong> will create multiple instances of the Cassandra interpreter,
+ thus multiple <code>com.datastax.driver.core.Session</code> objects.
+ <strong>Beware of resource and memory usage using this binding !</strong>
+ <br/><br/>
+ The <strong>isolated</strong> mode is the most extreme and will create as many JVM/<code>com.datastax.driver.core.Session</code> object as there are distinct notes.
+
+
+ </div>
</div>
</div>
@@ -974,6 +1012,20 @@
</div>
<div id="${changelogId}" class="panel-collapse collapse" role="tabpanel">
<div class="panel-body">
+ <strong>3.0</strong> :
+ <br/>
+ <ul>
+ <li>Update documentation</li>
+ <li>Update interactive documentation</li>
+ <li>Add support for binary protocol <strong>V4</strong></li>
+ <li>Implement new <code>@requestTimeOut</code> runtime option</li>
+ <li>Upgrade Java driver version to <strong>3.0.1</strong></li>
+ <li>Allow interpreter to add dynamic forms programmatically when using FormType.SIMPLE</li>
+ <li>Allow dynamic form using default Zeppelin syntax</li>
+ <li>Fixing typo on FallThroughPolicy</li>
+ <li>Look for data in AngularObjectRegistry before creating dynamic form</li>
+ <li>Add missing support for <code>ALTER</code> statements</li>
+ </ul>
<strong>2.0</strong> :
<br/>
<ul>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/68b9b00f/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
index 707c16a8..363da7b 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala
@@ -29,6 +29,7 @@ import com.datastax.driver.core._
import com.datastax.driver.core.exceptions.DriverException
import com.datastax.driver.core.policies.{LoggingRetryPolicy, FallthroughRetryPolicy, DowngradingConsistencyRetryPolicy, Policies}
import org.apache.zeppelin.cassandra.TextBlockHierarchy._
+import org.apache.zeppelin.display.AngularObjectRegistry
import org.apache.zeppelin.display.Input.ParamOption
import org.apache.zeppelin.interpreter.InterpreterResult.Code
import org.apache.zeppelin.interpreter.{InterpreterException, InterpreterResult, InterpreterContext}
@@ -41,17 +42,20 @@ import scala.collection.mutable.ArrayBuffer
/**
* Value object to store runtime query parameters
- * @param consistency consistency level
+ *
+ * @param consistency consistency level
* @param serialConsistency serial consistency level
* @param timestamp timestamp
* @param retryPolicy retry policy
* @param fetchSize query fetch size
+ * @param requestTimeOut request time out in millisecs
*/
case class CassandraQueryOptions(consistency: Option[ConsistencyLevel],
serialConsistency:Option[ConsistencyLevel],
timestamp: Option[Long],
retryPolicy: Option[RetryPolicy],
- fetchSize: Option[Int])
+ fetchSize: Option[Int],
+ requestTimeOut: Option[Int])
/**
* Singleton object to store constants
@@ -71,7 +75,7 @@ object InterpreterLogic {
val fallThroughRetryPolicy = FallthroughRetryPolicy.INSTANCE
val loggingDefaultRetryPolicy = new LoggingRetryPolicy(defaultRetryPolicy)
val loggingDownGradingRetryPolicy = new LoggingRetryPolicy(downgradingConsistencyRetryPolicy)
- val loggingFallThrougRetryPolicy = new LoggingRetryPolicy(fallThroughRetryPolicy)
+ val loggingFallThroughRetryPolicy = new LoggingRetryPolicy(fallThroughRetryPolicy)
val preparedStatements : mutable.Map[String,PreparedStatement] = new ConcurrentHashMap[String,PreparedStatement]().asScala
@@ -273,7 +277,13 @@ class InterpreterLogic(val session: Session) {
.flatMap(x => Option(x.value))
.headOption
- CassandraQueryOptions(consistency,serialConsistency, timestamp, retryPolicy, fetchSize)
+ val requestTimeOut: Option[Int] = parameters
+ .filter(_.paramType == RequestTimeOutParam)
+ .map(_.getParam[RequestTimeOut])
+ .flatMap(x => Option(x.value))
+ .headOption
+
+ CassandraQueryOptions(consistency,serialConsistency, timestamp, retryPolicy, fetchSize, requestTimeOut)
}
def generateSimpleStatement(st: SimpleStm, options: CassandraQueryOptions,context: InterpreterContext): SimpleStatement = {
@@ -305,19 +315,38 @@ class InterpreterLogic(val session: Session) {
def maybeExtractVariables(statement: String, context: InterpreterContext): String = {
+ def findInAngularRepository(variable: String): Option[AnyRef] = {
+ val registry = context.getAngularObjectRegistry
+ val noteId = context.getNoteId
+ val paragraphId = context.getParagraphId
+ val paragraphScoped: Option[AnyRef] = Option(registry.get(variable, noteId, paragraphId)).map[AnyRef](_.get())
+
+ paragraphScoped
+ }
+
def extractVariableAndDefaultValue(statement: String, exp: String):String = {
exp match {
- case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable,choices) => {
+ case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable, choices) => {
val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""").replaceAll("""\|""","""\\|""")
- val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList
- val paramOptions= listChoices.map(choice => new ParamOption(choice, choice))
- val selected = context.getGui.select(variable, listChoices.head, paramOptions.toArray)
- statement.replaceAll(escapedExp,selected.toString)
+ findInAngularRepository(variable) match {
+ case Some(value) => statement.replaceAll(escapedExp,value.toString)
+ case None => {
+ val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList
+ val paramOptions= listChoices.map(choice => new ParamOption(choice, choice))
+ val selected = context.getGui.select(variable, listChoices.head, paramOptions.toArray)
+ statement.replaceAll(escapedExp,selected.toString)
+ }
+ }
}
case SIMPLE_VARIABLE_DEFINITION_PATTERN(variable,defaultVal) => {
val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""")
- val value = context.getGui.input(variable,defaultVal)
- statement.replaceAll(escapedExp,value.toString)
+ findInAngularRepository(variable) match {
+ case Some(value) => statement.replaceAll(escapedExp,value.toString)
+ case None => {
+ val value = context.getGui.input(variable,defaultVal)
+ statement.replaceAll(escapedExp,value.toString)
+ }
+ }
}
case _ => throw new ParsingException(s"Invalid bound variable definition for '$exp' in '$statement'. It should be of form 'variable=defaultValue' or 'variable=value1|value2|...|valueN'")
}
@@ -336,10 +365,11 @@ class InterpreterLogic(val session: Session) {
case FallThroughRetryPolicy => statement.setRetryPolicy(fallThroughRetryPolicy)
case LoggingDefaultRetryPolicy => statement.setRetryPolicy(loggingDefaultRetryPolicy)
case LoggingDowngradingRetryPolicy => statement.setRetryPolicy(loggingDownGradingRetryPolicy)
- case LoggingFallThroughRetryPolicy => statement.setRetryPolicy(loggingFallThrougRetryPolicy)
+ case LoggingFallThroughRetryPolicy => statement.setRetryPolicy(loggingFallThroughRetryPolicy)
case _ => throw new InterpreterException(s"""Unknown retry policy ${options.retryPolicy.getOrElse("???")}""")
}
options.fetchSize.foreach(statement.setFetchSize(_))
+ options.requestTimeOut.foreach(statement.setReadTimeoutMillis(_))
}
private def createBoundStatement(codecRegistry: CodecRegistry, name: String, ps: PreparedStatement, rawBoundValues: String): BoundStatement = {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/68b9b00f/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
index d64ad90..5b2dbef 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
@@ -207,6 +207,16 @@ class JavaDriverConfig {
DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
return ProtocolVersion.V3
+ case "4" =>
+ DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
+ DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
+ DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
+ DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
+ DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
+ DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
+ DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
+ DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
+ return ProtocolVersion.V4
case _ =>
DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/68b9b00f/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
index e2cb64d..29c013f 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala
@@ -44,6 +44,7 @@ object ParagraphParser {
LOGGING_DEFAULT_RETRY, LOGGING_DOWNGRADING_RETRY, LOGGING_FALLTHROUGH_RETRY)
.mkString("""^\s*@retryPolicy\s*=\s*(""", "|" , """)\s*$""").r
val FETCHSIZE_PATTERN = """^\s*@fetchSize\s*=\s*([0-9]+)\s*$""".r
+ val REQUEST_TIMEOUT_PATTERN = """^\s*@requestTimeOut\s*=\s*([0-9]+)\s*$""".r
val SIMPLE_STATEMENT_PATTERN = """([^;]+;)""".r
val PREPARE_STATEMENT_PATTERN = """^\s*@prepare\[([^]]+)\]\s*=\s*([^;]+)$""".r
@@ -69,7 +70,7 @@ object ParagraphParser {
val UDF_PATTERN = """(?is)\s*(CREATE(?:\s+OR REPLACE)?\s+FUNCTION(?:\s+IF\s+NOT\s+EXISTS)?.+?(?:\s+|\n|\r|\f)AS(?:\s+|\n|\r|\f)(?:'|\$\$).+?(?:'|\$\$)\s*;)""".r
val GENERIC_STATEMENT_PREFIX =
- """(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|UPDATE|
+ """(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|ALTER|
|DROP|GRANT|REVOKE|TRUNCATE|LIST|USE)\s+""".r
val VALID_IDENTIFIER = "[a-z][a-z0-9_]*"
@@ -146,6 +147,7 @@ class ParagraphParser extends RegexParsers{
def timestamp: Parser[Timestamp] = """\s*@timestamp.+""".r ^^ {case x => extractTimestamp(x.trim)}
def retryPolicy: Parser[RetryPolicy] = """\s*@retryPolicy.+""".r ^^ {case x => extractRetryPolicy(x.trim)}
def fetchSize: Parser[FetchSize] = """\s*@fetchSize.+""".r ^^ {case x => extractFetchSize(x.trim)}
+ def requestTimeOut: Parser[RequestTimeOut] = """\s*@requestTimeOut.+""".r ^^ {case x => extractRequestTimeOut(x.trim)}
//Statements
def createFunctionStatement: Parser[SimpleStm] = UDF_PATTERN ^^{case x => extractUdfStatement(x.trim)}
@@ -188,7 +190,7 @@ class ParagraphParser extends RegexParsers{
case begin ~ cqls ~ end => BatchStm(extractBatchType(begin),cqls)}
def queries:Parser[List[AnyBlock]] = rep(singleLineComment | multiLineComment | consistency | serialConsistency |
- timestamp | retryPolicy | fetchSize | removePrepare | prepare | bind | batch | describeCluster |
+ timestamp | retryPolicy | fetchSize | requestTimeOut | removePrepare | prepare | bind | batch | describeCluster |
describeKeyspace | describeKeyspaces |
describeTable | describeTables |
describeType | describeTypes |
@@ -244,6 +246,14 @@ class ParagraphParser extends RegexParsers{
}
}
+ def extractRequestTimeOut(text: String): RequestTimeOut = {
+ text match {
+ case REQUEST_TIMEOUT_PATTERN(requestTimeOut) => RequestTimeOut(requestTimeOut.trim.toInt)
+ case _ => throw new InterpreterException(s"Invalid syntax for @requestTimeOut. " +
+ s"It should comply to the pattern ${REQUEST_TIMEOUT_PATTERN.toString}")
+ }
+ }
+
def extractSimpleStatement(text: String): SimpleStm = {
text match {
case SIMPLE_STATEMENT_PATTERN(statement) => SimpleStm(statement)
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/68b9b00f/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
index 61a2d8d..be55564 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala
@@ -44,6 +44,7 @@ object TextBlockHierarchy {
object TimestampParam extends ParameterType
object RetryPolicyParam extends ParameterType
object FetchSizeParam extends ParameterType
+ object RequestTimeOutParam extends ParameterType
abstract class QueryParameters(val paramType: ParameterType) extends AnyBlock(ParameterBlock) {
@@ -60,6 +61,8 @@ object TextBlockHierarchy {
case class FetchSize(value: Int) extends QueryParameters(FetchSizeParam)
+ case class RequestTimeOut(value: Int) extends QueryParameters(RequestTimeOutParam)
+
abstract class RetryPolicy extends QueryParameters(RetryPolicyParam)
object DefaultRetryPolicy extends RetryPolicy
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/68b9b00f/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
index 560c57e..db3c391 100644
--- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
+++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
@@ -35,6 +35,7 @@ import com.datastax.driver.core.Session;
import info.archinnov.achilles.embedded.CassandraEmbeddedServerBuilder;
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -45,7 +46,6 @@ import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -63,7 +63,7 @@ public class CassandraInterpreterTest {
.withScript("prepare_data.cql")
.withProtocolVersion(ProtocolVersion.V3)
.buildNativeSessionOnly();
-// public static Session session = null;
+
private static CassandraInterpreter interpreter;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
@@ -73,7 +73,7 @@ public class CassandraInterpreterTest {
public static void setUp() {
Properties properties = new Properties();
final Cluster cluster = session.getCluster();
-// final Cluster cluster = null;
+
properties.setProperty(CASSANDRA_CLUSTER_NAME, cluster.getClusterName());
properties.setProperty(CASSANDRA_COMPRESSION_PROTOCOL, "NONE");
properties.setProperty(CASSANDRA_CREDENTIALS_USERNAME, "none");
@@ -290,6 +290,19 @@ public class CassandraInterpreterTest {
}
@Test
+ public void should_execute_statement_with_request_timeout() throws Exception {
+ //Given
+ String statement = "@requestTimeOut=10000000\n" +
+ "SELECT * FROM zeppelin.artists;";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ }
+
+ @Test
public void should_execute_prepared_and_bound_statements() throws Exception {
//Given
String queries = "@prepare[ps]=INSERT INTO zeppelin.prepared(key,val) VALUES(?,?)\n" +
@@ -354,6 +367,8 @@ public class CassandraInterpreterTest {
@Test
public void should_extract_variable_from_statement() throws Exception {
//Given
+ AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+ when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
when(intrContext.getGui().input("login", "hsue")).thenReturn("hsue");
when(intrContext.getGui().input("age", "27")).thenReturn("27");
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/68b9b00f/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
index 77b5cd8..698397a 100644
--- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
+++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
@@ -31,6 +31,8 @@ import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input.ParamOption;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -101,6 +103,8 @@ public class InterpreterLogicTest {
@Test
public void should_extract_variable_and_default_value() throws Exception {
//Given
+ AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+ when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
when(intrContext.getGui().input("table", "zeppelin.demo")).thenReturn("zeppelin.demo");
when(intrContext.getGui().input("id", "'John'")).thenReturn("'John'");
@@ -114,6 +118,8 @@ public class InterpreterLogicTest {
@Test
public void should_extract_variable_and_choices() throws Exception {
//Given
+ AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+ when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
when(intrContext.getGui().select(eq("name"), eq("'Paul'"), optionsCaptor.capture())).thenReturn("'Jack'");
//When
@@ -142,6 +148,23 @@ public class InterpreterLogicTest {
}
@Test
+ public void should_extract_variable_from_angular_object_registry() throws Exception {
+ //Given
+ AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+ angularObjectRegistry.add("id", "from_angular_registry", "noteId", "paragraphId");
+ when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
+ when(intrContext.getNoteId()).thenReturn("noteId");
+ when(intrContext.getParagraphId()).thenReturn("paragraphId");
+
+ //When
+ final String actual = helper.maybeExtractVariables("SELECT * FROM zeppelin.demo WHERE id='{{id=John}}'", intrContext);
+
+ //Then
+ assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo WHERE id='from_angular_registry'");
+ verify(intrContext, never()).getGui();
+ }
+
+ @Test
public void should_error_if_incorrect_variable_definition() throws Exception {
//Given
@@ -204,6 +227,18 @@ public class InterpreterLogicTest {
}
@Test
+ public void should_extract_request_timeout_option() throws Exception {
+ //Given
+ List<QueryParameters> options = Arrays.<QueryParameters>asList(new RequestTimeOut(100));
+
+ //When
+ final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
+
+ //Then
+ assertThat(actual.requestTimeOut().get()).isEqualTo(100);
+ }
+
+ @Test
public void should_generate_simple_statement() throws Exception {
//Given
String input = "SELECT * FROM users LIMIT 10;";
@@ -211,6 +246,7 @@ public class InterpreterLogicTest {
Option.<ConsistencyLevel>empty(),
Option.empty(),
Option.<RetryPolicy>empty(),
+ Option.empty(),
Option.empty());
//When
@@ -232,6 +268,7 @@ public class InterpreterLogicTest {
Option.<ConsistencyLevel>empty(),
Option.empty(),
Option.<RetryPolicy>empty(),
+ Option.empty(),
Option.empty());
//When