You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/02/21 04:07:44 UTC
[1/2] zeppelin git commit: ZEPPELIN-3151. Fixed Checkstyle errors and
warnings in cassandra module
Repository: zeppelin
Updated Branches:
refs/heads/master b335caed3 -> 8bb888b49
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
index f3848fd..e096a0c 100644
--- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
+++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
@@ -16,27 +16,23 @@
*/
package org.apache.zeppelin.cassandra;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import static java.util.Arrays.asList;
+
import static com.datastax.driver.core.BatchStatement.Type.UNLOGGED;
import static com.datastax.driver.core.ConsistencyLevel.ALL;
import static com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL;
import static com.datastax.driver.core.ConsistencyLevel.ONE;
import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
import static com.datastax.driver.core.ConsistencyLevel.SERIAL;
-import static java.util.Arrays.asList;
-import static org.assertj.core.api.Assertions.*;
-import static org.mockito.Mockito.*;
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.Statement;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.display.ui.OptionInput.ParamOption;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -47,302 +43,334 @@ import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import scala.Option;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
-import org.apache.zeppelin.cassandra.TextBlockHierarchy.*;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+
+import scala.Option;
+
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.AnyBlock;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.Consistency;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.DowngradingRetryPolicy$;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.LoggingDefaultRetryPolicy$;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.QueryParameters;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.RequestTimeOut;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.RetryPolicy;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.SerialConsistency;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.SimpleStm;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.Timestamp;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.display.ui.OptionInput.ParamOption;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
@RunWith(MockitoJUnitRunner.class)
public class InterpreterLogicTest {
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
- @Mock(answer = Answers.RETURNS_DEEP_STUBS)
- private InterpreterContext intrContext;
-
- @Mock
- private Session session;
-
- final InterpreterLogic helper = new InterpreterLogic(session);
-
- @Captor
- ArgumentCaptor<ParamOption[]> optionsCaptor;
-
- @Test
- public void should_parse_input_string_block() throws Exception {
- //Given
- String input = "SELECT * FROM users LIMIT 10;";
-
- //When
- final List<AnyBlock> anyBlocks = this.<AnyBlock>toJavaList(helper.parseInput(input));
-
- //Then
- assertThat(anyBlocks).hasSize(1);
- assertThat(anyBlocks.get(0)).isInstanceOf(SimpleStm.class);
- }
-
- @Test
- public void should_exception_while_parsing_input() throws Exception {
- //Given
- String input = "SELECT * FROM users LIMIT 10";
-
- //When
- expectedException.expect(InterpreterException.class);
- expectedException.expectMessage("Error parsing input:\n" +
- "\t'SELECT * FROM users LIMIT 10'\n" +
- "Did you forget to add ; (semi-colon) at the end of each CQL statement ?");
-
- helper.parseInput(input);
- }
-
- @Test
- public void should_extract_variable_and_default_value() throws Exception {
- //Given
- AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
- when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
- when(intrContext.getGui().input("table", "zeppelin.demo")).thenReturn("zeppelin.demo");
- when(intrContext.getGui().input("id", "'John'")).thenReturn("'John'");
-
- //When
- final String actual = helper.maybeExtractVariables("SELECT * FROM {{table=zeppelin.demo}} WHERE id={{id='John'}}", intrContext);
-
- //Then
- assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo WHERE id='John'");
- }
-
- @Test
- public void should_extract_variable_and_choices() throws Exception {
- //Given
- AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
- when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
- when(intrContext.getGui().select(eq("name"), eq("'Paul'"), optionsCaptor.capture())).thenReturn("'Jack'");
-
- //When
- final String actual = helper.maybeExtractVariables("SELECT * FROM zeppelin.artists WHERE name={{name='Paul'|'Jack'|'Smith'}}", intrContext);
-
- //Then
- assertThat(actual).isEqualTo("SELECT * FROM zeppelin.artists WHERE name='Jack'");
- final List<ParamOption> paramOptions = asList(optionsCaptor.getValue());
- assertThat(paramOptions.get(0).getValue()).isEqualTo("'Paul'");
- assertThat(paramOptions.get(1).getValue()).isEqualTo("'Jack'");
- assertThat(paramOptions.get(2).getValue()).isEqualTo("'Smith'");
- }
-
- @Test
- public void should_extract_no_variable() throws Exception {
- //Given
- GUI gui = mock(GUI.class);
- when(intrContext.getGui()).thenReturn(gui);
-
- //When
- final String actual = helper.maybeExtractVariables("SELECT * FROM zeppelin.demo", intrContext);
-
- //Then
- verifyZeroInteractions(gui);
- assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo");
- }
-
- @Test
- public void should_extract_variable_from_angular_object_registry() throws Exception {
- //Given
- AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
- angularObjectRegistry.add("id", "from_angular_registry", "noteId", "paragraphId");
- when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
- when(intrContext.getNoteId()).thenReturn("noteId");
- when(intrContext.getParagraphId()).thenReturn("paragraphId");
-
- //When
- final String actual = helper.maybeExtractVariables("SELECT * FROM zeppelin.demo WHERE id='{{id=John}}'", intrContext);
-
- //Then
- assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo WHERE id='from_angular_registry'");
- verify(intrContext, never()).getGui();
- }
-
- @Test
- public void should_error_if_incorrect_variable_definition() throws Exception {
- //Given
-
- //When
- expectedException.expect(ParsingException.class);
- expectedException.expectMessage("Invalid bound variable definition for '{{table?zeppelin.demo}}' in 'SELECT * FROM {{table?zeppelin.demo}} WHERE id={{id='John'}}'. It should be of form 'variable=defaultValue'");
-
- //Then
- helper.maybeExtractVariables("SELECT * FROM {{table?zeppelin.demo}} WHERE id={{id='John'}}", intrContext);
- }
-
-
- @Test
- public void should_extract_consistency_option() throws Exception {
- //Given
- List<QueryParameters> options = Arrays.<QueryParameters>asList(new Consistency(ALL), new Consistency(ONE));
-
- //When
- final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
- //Then
- assertThat(actual.consistency().get()).isEqualTo(ALL);
- }
-
-
- @Test
- public void should_extract_serial_consistency_option() throws Exception {
- //Given
- List<QueryParameters> options = Arrays.<QueryParameters>asList(new SerialConsistency(SERIAL), new SerialConsistency(LOCAL_SERIAL));
-
- //When
- final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
- //Then
- assertThat(actual.serialConsistency().get()).isEqualTo(SERIAL);
- }
-
- @Test
- public void should_extract_timestamp_option() throws Exception {
- //Given
- List<QueryParameters> options = Arrays.<QueryParameters>asList(new Timestamp(123L), new Timestamp(456L));
-
- //When
- final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
- //Then
- assertThat(actual.timestamp().get()).isEqualTo(123L);
- }
-
- @Test
- public void should_extract_retry_policy_option() throws Exception {
- //Given
- List<QueryParameters> options = Arrays.<QueryParameters>asList(DowngradingRetryPolicy$.MODULE$, LoggingDefaultRetryPolicy$.MODULE$);
-
- //When
- final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
- //Then
- assertThat(actual.retryPolicy().get()).isSameAs(DowngradingRetryPolicy$.MODULE$);
- }
-
- @Test
- public void should_extract_request_timeout_option() throws Exception {
- //Given
- List<QueryParameters> options = Arrays.<QueryParameters>asList(new RequestTimeOut(100));
-
- //When
- final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
- //Then
- assertThat(actual.requestTimeOut().get()).isEqualTo(100);
- }
-
- @Test
- public void should_generate_simple_statement() throws Exception {
- //Given
- String input = "SELECT * FROM users LIMIT 10;";
- CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
- Option.<ConsistencyLevel>empty(),
- Option.empty(),
- Option.<RetryPolicy>empty(),
- Option.empty(),
- Option.empty());
-
- //When
- final SimpleStatement actual = helper.generateSimpleStatement(new SimpleStm(input), options, intrContext);
-
- //Then
- assertThat(actual).isNotNull();
- assertThat(actual.getQueryString()).isEqualTo("SELECT * FROM users LIMIT 10;");
- assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM);
- }
-
- @Test
- public void should_generate_batch_statement() throws Exception {
- //Given
- Statement st1 = new SimpleStatement("SELECT * FROM users LIMIT 10;");
- Statement st2 = new SimpleStatement("INSERT INTO users(id) VALUES(10);");
- Statement st3 = new SimpleStatement("UPDATE users SET name = 'John DOE' WHERE id=10;");
- CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
- Option.<ConsistencyLevel>empty(),
- Option.empty(),
- Option.<RetryPolicy>empty(),
- Option.empty(),
- Option.empty());
-
- //When
- BatchStatement actual = helper.generateBatchStatement(UNLOGGED, options, toScalaList(asList(st1, st2, st3)));
-
- //Then
- assertThat(actual).isNotNull();
- final List<Statement> statements = new ArrayList<>(actual.getStatements());
- assertThat(statements).hasSize(3);
- assertThat(statements.get(0)).isSameAs(st1);
- assertThat(statements.get(1)).isSameAs(st2);
- assertThat(statements.get(2)).isSameAs(st3);
- assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM);
- }
-
- @Test
- public void should_parse_bound_values() throws Exception {
- //Given
- String bs="'jdoe',32,'John DOE',null, true, '2014-06-12 34:00:34'";
-
- //When
- final List<String> actual = this.<String>toJavaList(helper.parseBoundValues("ps", bs));
-
- //Then
- assertThat(actual).containsExactly("'jdoe'", "32", "'John DOE'",
- "null", "true", "2014-06-12 34:00:34");
- }
-
- @Test
- public void should_parse_simple_date() throws Exception {
- //Given
- String dateString = "2015-07-30 12:00:01";
-
- //When
- final Date actual = helper.parseDate(dateString);
-
- //Then
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(actual);
-
- assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
- assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
- assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
- assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
- assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
- assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
- }
-
- @Test
- public void should_parse_accurate_date() throws Exception {
- //Given
- String dateString = "2015-07-30 12:00:01.123";
-
- //When
- final Date actual = helper.parseDate(dateString);
-
- //Then
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(actual);
-
- assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
- assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
- assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
- assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
- assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
- assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
- assertThat(calendar.get(Calendar.MILLISECOND)).isEqualTo(123);
- }
-
- private <A> scala.collection.immutable.List<A> toScalaList(java.util.List<A> list) {
- return scala.collection.JavaConversions.collectionAsScalaIterable(list).toList();
- }
-
- private <A> java.util.List<A> toJavaList(scala.collection.immutable.List<A> list){
- return scala.collection.JavaConversions.seqAsJavaList(list);
- }
-}
\ No newline at end of file
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private InterpreterContext intrContext;
+
+ @Mock
+ private Session session;
+
+ final InterpreterLogic helper = new InterpreterLogic(session);
+
+ @Captor
+ ArgumentCaptor<ParamOption[]> optionsCaptor;
+
+ @Test
+ public void should_parse_input_string_block() throws Exception {
+ //Given
+ String input = "SELECT * FROM users LIMIT 10;";
+
+ //When
+ final List<AnyBlock> anyBlocks = this.<AnyBlock>toJavaList(helper.parseInput(input));
+
+ //Then
+ assertThat(anyBlocks).hasSize(1);
+ assertThat(anyBlocks.get(0)).isInstanceOf(SimpleStm.class);
+ }
+
+ @Test
+ public void should_exception_while_parsing_input() throws Exception {
+ //Given
+ String input = "SELECT * FROM users LIMIT 10";
+
+ //When
+ expectedException.expect(InterpreterException.class);
+ expectedException.expectMessage("Error parsing input:\n" +
+ "\t'SELECT * FROM users LIMIT 10'\n" +
+ "Did you forget to add ; (semi-colon) at the end of each CQL statement ?");
+
+ helper.parseInput(input);
+ }
+
+ @Test
+ public void should_extract_variable_and_default_value() throws Exception {
+ //Given
+ AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+ when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
+ when(intrContext.getGui().input("table", "zeppelin.demo")).thenReturn("zeppelin.demo");
+ when(intrContext.getGui().input("id", "'John'")).thenReturn("'John'");
+
+ //When
+ final String actual = helper.maybeExtractVariables(
+ "SELECT * FROM {{table=zeppelin.demo}} WHERE id={{id='John'}}", intrContext);
+
+ //Then
+ assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo WHERE id='John'");
+ }
+
+ @Test
+ public void should_extract_variable_and_choices() throws Exception {
+ //Given
+ AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+ when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
+ when(intrContext.getGui().select(eq("name"), eq("'Paul'"), optionsCaptor.capture()))
+ .thenReturn("'Jack'");
+
+ //When
+ final String actual = helper.maybeExtractVariables(
+ "SELECT * FROM zeppelin.artists WHERE name={{name='Paul'|'Jack'|'Smith'}}",
+ intrContext);
+
+ //Then
+ assertThat(actual).isEqualTo("SELECT * FROM zeppelin.artists WHERE name='Jack'");
+ final List<ParamOption> paramOptions = asList(optionsCaptor.getValue());
+ assertThat(paramOptions.get(0).getValue()).isEqualTo("'Paul'");
+ assertThat(paramOptions.get(1).getValue()).isEqualTo("'Jack'");
+ assertThat(paramOptions.get(2).getValue()).isEqualTo("'Smith'");
+ }
+
+ @Test
+ public void should_extract_no_variable() throws Exception {
+ //Given
+ GUI gui = mock(GUI.class);
+ when(intrContext.getGui()).thenReturn(gui);
+
+ //When
+ final String actual = helper.maybeExtractVariables("SELECT * FROM zeppelin.demo", intrContext);
+
+ //Then
+ verifyZeroInteractions(gui);
+ assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo");
+ }
+
+ @Test
+ public void should_extract_variable_from_angular_object_registry() throws Exception {
+ //Given
+ AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+ angularObjectRegistry.add("id", "from_angular_registry", "noteId", "paragraphId");
+ when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
+ when(intrContext.getNoteId()).thenReturn("noteId");
+ when(intrContext.getParagraphId()).thenReturn("paragraphId");
+
+ //When
+ final String actual = helper.maybeExtractVariables(
+ "SELECT * FROM zeppelin.demo WHERE id='{{id=John}}'", intrContext);
+
+ //Then
+ assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo WHERE id='from_angular_registry'");
+ verify(intrContext, never()).getGui();
+ }
+
+ @Test
+ public void should_error_if_incorrect_variable_definition() throws Exception {
+ //Given
+
+ //When
+ expectedException.expect(ParsingException.class);
+ expectedException.expectMessage("Invalid bound variable definition for " +
+ "'{{table?zeppelin.demo}}' in 'SELECT * FROM {{table?zeppelin.demo}} " +
+ "WHERE id={{id='John'}}'. It should be of form 'variable=defaultValue'");
+
+ //Then
+ helper.maybeExtractVariables("SELECT * FROM {{table?zeppelin.demo}} WHERE id={{id='John'}}",
+ intrContext);
+ }
+
+ @Test
+ public void should_extract_consistency_option() throws Exception {
+ //Given
+ List<QueryParameters> options = Arrays.<QueryParameters>asList(new Consistency(ALL),
+ new Consistency(ONE));
+
+ //When
+ final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
+
+ //Then
+ assertThat(actual.consistency().get()).isEqualTo(ALL);
+ }
+
+ @Test
+ public void should_extract_serial_consistency_option() throws Exception {
+ //Given
+ List<QueryParameters> options = Arrays.<QueryParameters>asList(new SerialConsistency(SERIAL),
+ new SerialConsistency(LOCAL_SERIAL));
+
+ //When
+ final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
+
+ //Then
+ assertThat(actual.serialConsistency().get()).isEqualTo(SERIAL);
+ }
+
+ @Test
+ public void should_extract_timestamp_option() throws Exception {
+ //Given
+ List<QueryParameters> options = Arrays.<QueryParameters>asList(new Timestamp(123L),
+ new Timestamp(456L));
+
+ //When
+ final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
+
+ //Then
+ assertThat(actual.timestamp().get()).isEqualTo(123L);
+ }
+
+ @Test
+ public void should_extract_retry_policy_option() throws Exception {
+ //Given
+ List<QueryParameters> options = Arrays.<QueryParameters>asList(DowngradingRetryPolicy$.MODULE$,
+ LoggingDefaultRetryPolicy$.MODULE$);
+
+ //When
+ final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
+
+ //Then
+ assertThat(actual.retryPolicy().get()).isSameAs(DowngradingRetryPolicy$.MODULE$);
+ }
+
+ @Test
+ public void should_extract_request_timeout_option() throws Exception {
+ //Given
+ List<QueryParameters> options = Arrays.<QueryParameters>asList(new RequestTimeOut(100));
+
+ //When
+ final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
+
+ //Then
+ assertThat(actual.requestTimeOut().get()).isEqualTo(100);
+ }
+
+ @Test
+ public void should_generate_simple_statement() throws Exception {
+ //Given
+ String input = "SELECT * FROM users LIMIT 10;";
+ CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
+ Option.<ConsistencyLevel>empty(),
+ Option.empty(),
+ Option.<RetryPolicy>empty(),
+ Option.empty(),
+ Option.empty());
+
+ //When
+ final SimpleStatement actual = helper.generateSimpleStatement(new SimpleStm(input), options,
+ intrContext);
+
+ //Then
+ assertThat(actual).isNotNull();
+ assertThat(actual.getQueryString()).isEqualTo("SELECT * FROM users LIMIT 10;");
+ assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM);
+ }
+
+ @Test
+ public void should_generate_batch_statement() throws Exception {
+ //Given
+ Statement st1 = new SimpleStatement("SELECT * FROM users LIMIT 10;");
+ Statement st2 = new SimpleStatement("INSERT INTO users(id) VALUES(10);");
+ Statement st3 = new SimpleStatement("UPDATE users SET name = 'John DOE' WHERE id=10;");
+ CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
+ Option.<ConsistencyLevel>empty(),
+ Option.empty(),
+ Option.<RetryPolicy>empty(),
+ Option.empty(),
+ Option.empty());
+
+ //When
+ BatchStatement actual = helper.generateBatchStatement(UNLOGGED, options,
+ toScalaList(asList(st1, st2, st3)));
+
+ //Then
+ assertThat(actual).isNotNull();
+ final List<Statement> statements = new ArrayList<>(actual.getStatements());
+ assertThat(statements).hasSize(3);
+ assertThat(statements.get(0)).isSameAs(st1);
+ assertThat(statements.get(1)).isSameAs(st2);
+ assertThat(statements.get(2)).isSameAs(st3);
+ assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM);
+ }
+
+ @Test
+ public void should_parse_bound_values() throws Exception {
+ //Given
+ String bs = "'jdoe',32,'John DOE',null, true, '2014-06-12 34:00:34'";
+
+ //When
+ final List<String> actual = this.<String>toJavaList(helper.parseBoundValues("ps", bs));
+
+ //Then
+ assertThat(actual).containsExactly("'jdoe'", "32", "'John DOE'",
+ "null", "true", "2014-06-12 34:00:34");
+ }
+
+ @Test
+ public void should_parse_simple_date() throws Exception {
+ //Given
+ String dateString = "2015-07-30 12:00:01";
+
+ //When
+ final Date actual = helper.parseDate(dateString);
+
+ //Then
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(actual);
+
+ assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
+ assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
+ assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
+ assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
+ assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
+ assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
+ }
+
+ @Test
+ public void should_parse_accurate_date() throws Exception {
+ //Given
+ String dateString = "2015-07-30 12:00:01.123";
+
+ //When
+ final Date actual = helper.parseDate(dateString);
+
+ //Then
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(actual);
+
+ assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
+ assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
+ assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
+ assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
+ assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
+ assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
+ assertThat(calendar.get(Calendar.MILLISECOND)).isEqualTo(123);
+ }
+
+ private <A> scala.collection.immutable.List<A> toScalaList(java.util.List<A> list) {
+ return scala.collection.JavaConversions.collectionAsScalaIterable(list).toList();
+ }
+
+ private <A> java.util.List<A> toJavaList(scala.collection.immutable.List<A> list){
+ return scala.collection.JavaConversions.seqAsJavaList(list);
+ }
+}
[2/2] zeppelin git commit: ZEPPELIN-3151. Fixed Checkstyle errors and
warnings in cassandra module
Posted by zj...@apache.org.
ZEPPELIN-3151. Fixed Checkstyle errors and warnings in cassandra module
### What is this PR for?
Fixed the Checkstyle errors and warnings in the cassandra module.
### What type of PR is it?
Improvement
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3151
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: Jan Hentschel <ja...@ultratendency.com>
Closes #2785 from HorizonNet/ZEPPELIN-3151 and squashes the following commits:
646c410 [Jan Hentschel] ZEPPELIN-3151. Fixed Checkstyle errors and warnings in cassandra module
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/8bb888b4
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/8bb888b4
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/8bb888b4
Branch: refs/heads/master
Commit: 8bb888b494b9f611a4e5d81bd8451626eadd7e12
Parents: b335cae
Author: Jan Hentschel <ja...@ultratendency.com>
Authored: Sat Feb 10 12:05:20 2018 +0100
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Feb 21 12:07:38 2018 +0800
----------------------------------------------------------------------
cassandra/pom.xml | 7 +
.../cassandra/CassandraInterpreter.java | 48 +-
.../zeppelin/cassandra/ParsingException.java | 2 +-
.../zeppelin/cassandra/JavaDriverConfig.scala | 80 +-
.../cassandra/CassandraInterpreterTest.java | 1463 +++++++++---------
.../cassandra/InterpreterLogicTest.java | 636 ++++----
6 files changed, 1157 insertions(+), 1079 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/cassandra/pom.xml b/cassandra/pom.xml
index a07ed82..c5b08f7 100644
--- a/cassandra/pom.xml
+++ b/cassandra/pom.xml
@@ -249,6 +249,13 @@
<plugin>
<artifactId>maven-resources-plugin</artifactId>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
index 0f986be..105e271 100644
--- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
+++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
@@ -16,21 +16,11 @@
*/
package org.apache.zeppelin.cassandra;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.JdkSSLOptions;
-import com.datastax.driver.core.ProtocolOptions.Compression;
-import com.datastax.driver.core.Session;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
+import static java.lang.Integer.parseInt;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -39,10 +29,23 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-import static java.lang.Integer.parseInt;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.JdkSSLOptions;
+import com.datastax.driver.core.ProtocolOptions.Compression;
+import com.datastax.driver.core.Session;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
/**
- * Interpreter for Apache Cassandra CQL query language
+ * Interpreter for Apache Cassandra CQL query language.
*/
public class CassandraInterpreter extends Interpreter {
@@ -128,14 +131,14 @@ public class CassandraInterpreter extends Interpreter {
public static final String DEFAULT_CREDENTIAL = "none";
public static final String DEFAULT_POLICY = "DEFAULT";
public static final String DEFAULT_PARALLELISM = "10";
- static String DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100";
- static String DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "100";
- static String DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2";
- static String DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1";
- static String DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8";
- static String DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2";
- static String DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024";
- static String DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256";
+ static String defaultNewConnectionThresholdLocal = "100";
+ static String defaultNewConnectionThresholdRemote = "100";
+ static String defaultCoreConnectionPerHostLocal = "2";
+ static String defaultCoreConnectionPerHostRemote = "1";
+ static String defaultMaxConnectionPerHostLocal = "8";
+ static String defaultMaxConnectionPerHostRemote = "2";
+ static String defaultMaxRequestPerConnectionLocal = "1024";
+ static String defaultMaxRequestPerConnectionRemote = "256";
public static final String DEFAULT_IDLE_TIMEOUT = "120";
public static final String DEFAULT_POOL_TIMEOUT = "5000";
public static final String DEFAULT_HEARTBEAT_INTERVAL = "30";
@@ -244,7 +247,6 @@ public class CassandraInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
-
}
@Override
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/src/main/java/org/apache/zeppelin/cassandra/ParsingException.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/ParsingException.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/ParsingException.java
index da9bb0c..b87e8b2 100644
--- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/ParsingException.java
+++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/ParsingException.java
@@ -17,7 +17,7 @@
package org.apache.zeppelin.cassandra;
/**
- * Parsing Exception for Cassandra CQL statement
+ * Parsing Exception for Cassandra CQL statement.
*/
public class ParsingException extends RuntimeException{
public ParsingException(String message) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
index 5b2dbef..865d89f 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
@@ -178,54 +178,54 @@ class JavaDriverConfig {
protocolVersion match {
case "1" =>
- DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8"
- DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2"
- DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2"
- DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
- DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100"
- DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "1"
- DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "128"
- DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "128"
+ defaultMaxConnectionPerHostLocal = "8"
+ defaultMaxConnectionPerHostRemote = "2"
+ defaultCoreConnectionPerHostLocal = "2"
+ defaultCoreConnectionPerHostRemote = "1"
+ defaultNewConnectionThresholdLocal = "100"
+ defaultNewConnectionThresholdRemote = "1"
+ defaultMaxRequestPerConnectionLocal = "128"
+ defaultMaxRequestPerConnectionRemote = "128"
return ProtocolVersion.V1
case "2" =>
- DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8"
- DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2"
- DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2"
- DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
- DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100"
- DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "1"
- DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "128"
- DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "128"
+ defaultMaxConnectionPerHostLocal = "8"
+ defaultMaxConnectionPerHostRemote = "2"
+ defaultCoreConnectionPerHostLocal = "2"
+ defaultCoreConnectionPerHostRemote = "1"
+ defaultNewConnectionThresholdLocal = "100"
+ defaultNewConnectionThresholdRemote = "1"
+ defaultMaxRequestPerConnectionLocal = "128"
+ defaultMaxRequestPerConnectionRemote = "128"
return ProtocolVersion.V2
case "3" =>
- DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
- DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
- DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
- DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
- DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
- DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
- DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
- DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
+ defaultMaxConnectionPerHostLocal = "1"
+ defaultMaxConnectionPerHostRemote = "1"
+ defaultCoreConnectionPerHostLocal = "1"
+ defaultCoreConnectionPerHostRemote = "1"
+ defaultNewConnectionThresholdLocal = "800"
+ defaultNewConnectionThresholdRemote = "200"
+ defaultMaxRequestPerConnectionLocal = "1024"
+ defaultMaxRequestPerConnectionRemote = "256"
return ProtocolVersion.V3
case "4" =>
- DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
- DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
- DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
- DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
- DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
- DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
- DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
- DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
+ defaultMaxConnectionPerHostLocal = "1"
+ defaultMaxConnectionPerHostRemote = "1"
+ defaultCoreConnectionPerHostLocal = "1"
+ defaultCoreConnectionPerHostRemote = "1"
+ defaultNewConnectionThresholdLocal = "800"
+ defaultNewConnectionThresholdRemote = "200"
+ defaultMaxRequestPerConnectionLocal = "1024"
+ defaultMaxRequestPerConnectionRemote = "256"
return ProtocolVersion.V4
case _ =>
- DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
- DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
- DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
- DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
- DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
- DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
- DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
- DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
+ defaultMaxConnectionPerHostLocal = "1"
+ defaultMaxConnectionPerHostRemote = "1"
+ defaultCoreConnectionPerHostLocal = "1"
+ defaultCoreConnectionPerHostRemote = "1"
+ defaultNewConnectionThresholdLocal = "800"
+ defaultNewConnectionThresholdRemote = "200"
+ defaultMaxRequestPerConnectionLocal = "1024"
+ defaultMaxRequestPerConnectionRemote = "256"
return ProtocolVersion.NEWEST_SUPPORTED
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
index cf392bb..df5e4ac 100644
--- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
+++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
@@ -16,18 +16,57 @@
*/
package org.apache.zeppelin.cassandra;
-import static com.datastax.driver.core.ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
import static com.google.common.collect.FluentIterable.from;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+import static com.datastax.driver.core.ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
+
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CLUSTER_NAME;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_COMPRESSION_PROTOCOL;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CREDENTIALS_PASSWORD;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CREDENTIALS_USERNAME;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_HOSTS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_LOAD_BALANCING_POLICY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_PORT;
import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_PROTOCOL_VERSION;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.*;
-import static org.assertj.core.api.Assertions.*;
-import static org.mockito.Mockito.when;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_CONSISTENCY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_FETCH_SIZE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_RECONNECTION_POLICY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_RETRY_POLICY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKET_TCP_NO_DELAY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SPECULATIVE_EXECUTION_POLICY;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Properties;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ProtocolVersion;
@@ -40,714 +79,716 @@ import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.junit.*;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Properties;
@RunWith(MockitoJUnitRunner.class)
public class CassandraInterpreterTest {
-
- private static final String ARTISTS_TABLE = "zeppelin.artists";
-
- public static Session session = CassandraEmbeddedServerBuilder
- .noEntityPackages()
- .withKeyspaceName("zeppelin")
- .withScript("prepare_schema.cql")
- .withScript("prepare_data.cql")
- .withProtocolVersion(ProtocolVersion.V3)
- .buildNativeSessionOnly();
-
- private static CassandraInterpreter interpreter;
-
- @Mock(answer = Answers.RETURNS_DEEP_STUBS)
- private InterpreterContext intrContext;
-
- @BeforeClass
- public static void setUp() {
- Properties properties = new Properties();
- final Cluster cluster = session.getCluster();
-
- properties.setProperty(CASSANDRA_CLUSTER_NAME, cluster.getClusterName());
- properties.setProperty(CASSANDRA_COMPRESSION_PROTOCOL, "NONE");
- properties.setProperty(CASSANDRA_CREDENTIALS_USERNAME, "none");
- properties.setProperty(CASSANDRA_CREDENTIALS_PASSWORD, "none");
-
- properties.setProperty(CASSANDRA_PROTOCOL_VERSION, "3");
- properties.setProperty(CASSANDRA_LOAD_BALANCING_POLICY, "DEFAULT");
- properties.setProperty(CASSANDRA_RETRY_POLICY, "DEFAULT");
- properties.setProperty(CASSANDRA_RECONNECTION_POLICY, "DEFAULT");
- properties.setProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY, "DEFAULT");
-
- properties.setProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
- DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS + "");
-
- properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL, "100");
- properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE, "100");
- properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL, "2");
- properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE, "1");
- properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL, "8");
- properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE, "2");
- properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL, "1024");
- properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE, "256");
-
- properties.setProperty(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS, "120");
- properties.setProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS, "5000");
- properties.setProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS, "30");
-
- properties.setProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY, "ONE");
- properties.setProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY, "SERIAL");
- properties.setProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE, "5000");
-
- properties.setProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS, "5000");
- properties.setProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS, "12000");
- properties.setProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, "true");
-
- properties.setProperty(CASSANDRA_HOSTS, from(cluster.getMetadata().getAllHosts()).first().get().getAddress().getHostAddress());
- properties.setProperty(CASSANDRA_PORT, cluster.getConfiguration().getProtocolOptions().getPort()+"");
- interpreter = new CassandraInterpreter(properties);
- interpreter.open();
- }
-
- @AfterClass
- public static void tearDown() {
- interpreter.close();
- }
-
- @Before
- public void prepareContext() {
- when(intrContext.getParagraphTitle()).thenReturn("Paragraph1");
- }
-
- @Test
- public void should_create_cluster_and_session_upon_call_to_open() throws Exception {
- assertThat(interpreter.cluster).isNotNull();
- assertThat(interpreter.cluster.getClusterName()).isEqualTo(session.getCluster().getClusterName());
- assertThat(interpreter.session).isNotNull();
- assertThat(interpreter.helper).isNotNull();
- }
-
- @Test
- public void should_interpret_simple_select() throws Exception {
- //Given
-
- //When
- final InterpreterResult actual = interpreter.interpret("SELECT * FROM " + ARTISTS_TABLE + " LIMIT 10;", intrContext);
-
- //Then
- assertThat(actual).isNotNull();
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(actual.message().get(0).getData()).isEqualTo("name\tborn\tcountry\tdied\tgender\tstyles\ttype\n" +
- "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
- "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n" +
- "Sheryl Crow\t1962-02-11\tUSA\tnull\tFemale\t[Classic, Rock, Country, Blues, Pop, Folk]\tPerson\n" +
- "Doof\t1968-08-31\tUnited Kingdom\tnull\tnull\t[Unknown]\tPerson\n" +
- "House of Large Sizes\t1986-01-01\tUSA\t2003\tnull\t[Unknown]\tGroup\n" +
- "Fanfarlo\t2006-01-01\tUnited Kingdom\tnull\tnull\t[Rock, Indie, Pop, Classic]\tGroup\n" +
- "Jeff Beck\t1944-06-24\tUnited Kingdom\tnull\tMale\t[Rock, Pop, Classic]\tPerson\n" +
- "Los Paranoias\tnull\tUnknown\tnull\tnull\t[Unknown]\tnull\n" +
- "…And You Will Know Us by the Trail of Dead\t1994-01-01\tUSA\tnull\tnull\t[Rock, Pop, Classic]\tGroup\n");
-
- }
-
- @Test
- public void should_interpret_select_statement() throws Exception {
- //Given
-
- //When
- final InterpreterResult actual = interpreter.interpret("SELECT * FROM " + ARTISTS_TABLE + " LIMIT 2;", intrContext);
-
- //Then
- assertThat(actual).isNotNull();
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(actual.message().get(0).getData()).isEqualTo("name\tborn\tcountry\tdied\tgender\tstyles\ttype\n" +
- "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
- "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n");
-
- }
-
- @Test
- public void should_interpret_multiple_statements_with_single_line_logged_batch() throws Exception {
- //Given
- String statements = "CREATE TABLE IF NOT EXISTS zeppelin.albums(\n" +
- " title text PRIMARY KEY,\n" +
- " artist text,\n" +
- " year int\n" +
- ");\n" +
- "BEGIN BATCH"+
- " INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Impossible Dream EP','Carter the Unstoppable Sex Machine',1992);"+
- " INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Way You Are','Tears for Fears',1983);"+
- " INSERT INTO zeppelin.albums(title,artist,year) VALUES('Primitive','Soulfly',2003);"+
- "APPLY BATCH;\n"+
- "SELECT * FROM zeppelin.albums;";
- //When
- final InterpreterResult actual = interpreter.interpret(statements, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(actual.message().get(0).getData()).isEqualTo("title\tartist\tyear\n" +
- "The Impossible Dream EP\tCarter the Unstoppable Sex Machine\t1992\n" +
- "The Way You Are\tTears for Fears\t1983\n" +
- "Primitive\tSoulfly\t2003\n");
- }
+ private static final String ARTISTS_TABLE = "zeppelin.artists";
+
+ public static Session session = CassandraEmbeddedServerBuilder
+ .noEntityPackages()
+ .withKeyspaceName("zeppelin")
+ .withScript("prepare_schema.cql")
+ .withScript("prepare_data.cql")
+ .withProtocolVersion(ProtocolVersion.V3)
+ .buildNativeSessionOnly();
+
+ private static CassandraInterpreter interpreter;
+
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private InterpreterContext intrContext;
+
+ @BeforeClass
+ public static void setUp() {
+ Properties properties = new Properties();
+ final Cluster cluster = session.getCluster();
+
+ properties.setProperty(CASSANDRA_CLUSTER_NAME, cluster.getClusterName());
+ properties.setProperty(CASSANDRA_COMPRESSION_PROTOCOL, "NONE");
+ properties.setProperty(CASSANDRA_CREDENTIALS_USERNAME, "none");
+ properties.setProperty(CASSANDRA_CREDENTIALS_PASSWORD, "none");
+
+ properties.setProperty(CASSANDRA_PROTOCOL_VERSION, "3");
+ properties.setProperty(CASSANDRA_LOAD_BALANCING_POLICY, "DEFAULT");
+ properties.setProperty(CASSANDRA_RETRY_POLICY, "DEFAULT");
+ properties.setProperty(CASSANDRA_RECONNECTION_POLICY, "DEFAULT");
+ properties.setProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY, "DEFAULT");
+
+ properties.setProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
+ DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS + "");
+
+ properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL, "100");
+ properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE, "100");
+ properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL, "2");
+ properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE, "1");
+ properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL, "8");
+ properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE, "2");
+ properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL, "1024");
+ properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE, "256");
+
+ properties.setProperty(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS, "120");
+ properties.setProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS, "5000");
+ properties.setProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS, "30");
+
+ properties.setProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY, "ONE");
+ properties.setProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY, "SERIAL");
+ properties.setProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE, "5000");
+
+ properties.setProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS, "5000");
+ properties.setProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS, "12000");
+ properties.setProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, "true");
+
+ properties.setProperty(CASSANDRA_HOSTS, from(cluster.getMetadata().getAllHosts()).first()
+ .get().getAddress().getHostAddress());
+ properties.setProperty(CASSANDRA_PORT, cluster.getConfiguration().getProtocolOptions()
+ .getPort() + "");
+ interpreter = new CassandraInterpreter(properties);
+ interpreter.open();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ interpreter.close();
+ }
+
+ @Before
+ public void prepareContext() {
+ when(intrContext.getParagraphTitle()).thenReturn("Paragraph1");
+ }
+
+ @Test
+ public void should_create_cluster_and_session_upon_call_to_open() throws Exception {
+ assertThat(interpreter.cluster).isNotNull();
+ assertThat(interpreter.cluster.getClusterName()).isEqualTo(session.getCluster()
+ .getClusterName());
+ assertThat(interpreter.session).isNotNull();
+ assertThat(interpreter.helper).isNotNull();
+ }
+
+ @Test
+ public void should_interpret_simple_select() throws Exception {
+ //Given
+
+ //When
+ final InterpreterResult actual = interpreter.interpret("SELECT * FROM " + ARTISTS_TABLE +
+ " LIMIT 10;", intrContext);
+
+ //Then
+ assertThat(actual).isNotNull();
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(actual.message().get(0).getData()).isEqualTo("name\tborn\tcountry\tdied\tgender\t" +
+ "styles\ttype\n" +
+ "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
+ "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n" +
+ "Sheryl Crow\t1962-02-11\tUSA\tnull\tFemale\t" +
+ "[Classic, Rock, Country, Blues, Pop, Folk]\tPerson\n" +
+ "Doof\t1968-08-31\tUnited Kingdom\tnull\tnull\t[Unknown]\tPerson\n" +
+ "House of Large Sizes\t1986-01-01\tUSA\t2003\tnull\t[Unknown]\tGroup\n" +
+ "Fanfarlo\t2006-01-01\tUnited Kingdom\tnull\tnull\t" +
+ "[Rock, Indie, Pop, Classic]\tGroup\n" +
+ "Jeff Beck\t1944-06-24\tUnited Kingdom\tnull\tMale\t[Rock, Pop, Classic]\tPerson\n" +
+ "Los Paranoias\tnull\tUnknown\tnull\tnull\t[Unknown]\tnull\n" +
+ "…And You Will Know Us by the Trail of Dead\t1994-01-01\tUSA\tnull\tnull\t" +
+ "[Rock, Pop, Classic]\tGroup\n");
+ }
+
+ @Test
+ public void should_interpret_select_statement() throws Exception {
+ //Given
+
+ //When
+ final InterpreterResult actual = interpreter.interpret("SELECT * FROM " + ARTISTS_TABLE +
+ " LIMIT 2;", intrContext);
+
+ //Then
+ assertThat(actual).isNotNull();
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(actual.message().get(0).getData())
+ .isEqualTo("name\tborn\tcountry\tdied\tgender\tstyles\ttype\n" +
+ "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
+ "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n");
+ }
+
+ @Test
+ public void should_interpret_multiple_statements_with_single_line_logged_batch() {
+ //Given
+ String statements = "CREATE TABLE IF NOT EXISTS zeppelin.albums(\n" +
+ " title text PRIMARY KEY,\n" +
+ " artist text,\n" +
+ " year int\n" +
+ ");\n" +
+ "BEGIN BATCH" +
+ " INSERT INTO zeppelin.albums(title,artist,year) " +
+ "VALUES('The Impossible Dream EP','Carter the Unstoppable Sex Machine',1992);" +
+ " INSERT INTO zeppelin.albums(title,artist,year) " +
+ "VALUES('The Way You Are','Tears for Fears',1983);" +
+ " INSERT INTO zeppelin.albums(title,artist,year) " +
+ "VALUES('Primitive','Soulfly',2003);" +
+ "APPLY BATCH;\n" +
+ "SELECT * FROM zeppelin.albums;";
+ //When
+ final InterpreterResult actual = interpreter.interpret(statements, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(actual.message().get(0).getData()).isEqualTo("title\tartist\tyear\n" +
+ "The Impossible Dream EP\tCarter the Unstoppable Sex Machine\t1992\n" +
+ "The Way You Are\tTears for Fears\t1983\n" +
+ "Primitive\tSoulfly\t2003\n");
+ }
- @Test
- public void should_throw_statement_not_having_semi_colon() throws Exception {
- //Given
- String statement = "SELECT * zeppelin.albums";
-
- //When
- final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.ERROR);
- assertThat(actual.message().get(0).getData())
- .contains("Error parsing input:\n" +
- "\t'SELECT * zeppelin.albums'\n" +
- "Did you forget to add ; (semi-colon) at the end of each CQL statement ?");
- }
-
- @Test
- public void should_validate_statement() throws Exception {
- //Given
- String statement = "SELECT * zeppelin.albums;";
-
- //When
- final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.ERROR);
- assertThat(actual.message().get(0).getData()).contains("line 1:9 missing K_FROM at 'zeppelin' (SELECT * [zeppelin]....)");
- }
-
- @Test
- public void should_execute_statement_with_consistency_option() throws Exception {
- //Given
- String statement = "@consistency=THREE\n" +
- "SELECT * FROM zeppelin.artists LIMIT 1;";
-
- //When
- final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.ERROR);
- assertThat(actual.message().get(0).getData())
- .contains("Not enough replicas available for query at consistency THREE (3 required but only 1 alive)");
- }
-
- @Test
- public void should_execute_statement_with_serial_consistency_option() throws Exception {
- //Given
- String statement = "@serialConsistency=SERIAL\n" +
- "SELECT * FROM zeppelin.artists LIMIT 1;";
-
- //When
- final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- }
-
- @Test
- public void should_execute_statement_with_timestamp_option() throws Exception {
- //Given
- String statement1 = "INSERT INTO zeppelin.ts(key,val) VALUES('k','v1');";
- String statement2 = "@timestamp=15\n" +
- "INSERT INTO zeppelin.ts(key,val) VALUES('k','v2');";
-
- // Insert v1 with current timestamp
- interpreter.interpret(statement1, intrContext);
-
- Thread.sleep(1);
-
- //When
- // Insert v2 with past timestamp
- interpreter.interpret(statement2, intrContext);
- final String actual = session.execute("SELECT * FROM zeppelin.ts LIMIT 1").one().getString("val");
-
- //Then
- assertThat(actual).isEqualTo("v1");
- }
-
- @Test
- public void should_execute_statement_with_retry_policy() throws Exception {
- //Given
- String statement = "@retryPolicy=" + interpreter.LOGGING_DOWNGRADING_RETRY + "\n" +
- "@consistency=THREE\n" +
- "SELECT * FROM zeppelin.artists LIMIT 1;";
-
- //When
- final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- }
-
- @Test
- public void should_execute_statement_with_request_timeout() throws Exception {
- //Given
- String statement = "@requestTimeOut=10000000\n" +
- "SELECT * FROM zeppelin.artists;";
-
- //When
- final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- }
-
- @Test
- public void should_execute_prepared_and_bound_statements() throws Exception {
- //Given
- String queries = "@prepare[ps]=INSERT INTO zeppelin.prepared(key,val) VALUES(?,?)\n" +
- "@prepare[select]=SELECT * FROM zeppelin.prepared WHERE key=:key\n" +
- "@bind[ps]='myKey','myValue'\n" +
- "@bind[select]='myKey'";
-
- //When
- final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(actual.message().get(0).getData()).isEqualTo("key\tval\n" +
- "myKey\tmyValue\n");
- }
-
- @Test
- public void should_execute_bound_statement() throws Exception {
- //Given
- String queries = "@prepare[users_insert]=INSERT INTO zeppelin.users" +
- "(login,firstname,lastname,addresses,location)" +
- "VALUES(:login,:fn,:ln,:addresses,:loc)\n" +
- "@bind[users_insert]='jdoe','John','DOE'," +
- "{street_number: 3, street_name: 'Beverly Hills Bld', zip_code: 90209," +
- " country: 'USA', extra_info: ['Right on the hills','Next to the post box']," +
- " phone_numbers: {'home': 2016778524, 'office': 2015790847}}," +
- "('USA', 90209, 'Beverly Hills')\n" +
- "SELECT * FROM zeppelin.users WHERE login='jdoe';";
- //When
- final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(actual.message().get(0).getData()).isEqualTo(
- "login\taddresses\tage\tdeceased\tfirstname\tlast_update\tlastname\tlocation\n" +
- "jdoe\t" +
- "{street_number:3,street_name:'Beverly Hills Bld',zip_code:90209," +
- "country:'USA',extra_info:['Right on the hills','Next to the post box']," +
- "phone_numbers:{'office':2015790847,'home':2016778524}}\tnull\t" +
- "null\t" +
- "John\t" +
- "null\t" +
- "DOE\t" +
- "('USA',90209,'Beverly Hills')\n");
- }
-
- @Test
- public void should_exception_when_executing_unknown_bound_statement() throws Exception {
- //Given
- String queries = "@bind[select_users]='jdoe'";
-
- //When
- final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.ERROR);
- assertThat(actual.message().get(0).getData())
- .isEqualTo("The statement 'select_users' can not be bound to values. " +
- "Are you sure you did prepare it with @prepare[select_users] ?");
- }
-
- @Test
- public void should_extract_variable_from_statement() throws Exception {
- //Given
- AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
- when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
- when(intrContext.getGui().input("login", "hsue")).thenReturn("hsue");
- when(intrContext.getGui().input("age", "27")).thenReturn("27");
-
- String queries = "@prepare[test_insert_with_variable]=" +
- "INSERT INTO zeppelin.users(login,firstname,lastname,age) VALUES(?,?,?,?)\n" +
- "@bind[test_insert_with_variable]='{{login=hsue}}','Helen','SUE',{{age=27}}\n" +
- "SELECT firstname,lastname,age FROM zeppelin.users WHERE login='hsue';";
- //When
- final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\tage\n" +
- "Helen\tSUE\t27\n");
-
- }
-
- @Test
- public void should_just_prepare_statement() throws Exception {
- //Given
- String queries = "@prepare[just_prepare]=SELECT name,country,styles FROM zeppelin.artists LIMIT 3";
- final String expected = reformatHtml(
- readTestResource("/scalate/NoResult.html"));
-
- //When
- final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
- }
-
- @Test
- public void should_execute_bound_statement_with_no_bound_value() throws Exception {
- //Given
- String queries = "@prepare[select_no_bound_value]=SELECT name,country,styles FROM zeppelin.artists LIMIT 3\n" +
- "@bind[select_no_bound_value]";
-
- //When
- final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(actual.message().get(0).getData()).isEqualTo("name\tcountry\tstyles\n" +
- "Bogdan Raczynski\tPoland\t[Dance, Electro]\n" +
- "Krishna Das\tUSA\t[Unknown]\n" +
- "Sheryl Crow\tUSA\t[Classic, Rock, Country, Blues, Pop, Folk]\n");
-
- }
-
- @Test
- public void should_parse_date_value() throws Exception {
- //Given
- String queries = "@prepare[parse_date]=INSERT INTO zeppelin.users(login,last_update) VALUES(?,?)\n" +
- "@bind[parse_date]='last_update','2015-07-30 12:00:01'\n" +
- "SELECT last_update FROM zeppelin.users WHERE login='last_update';";
- //When
- final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(actual.message().get(0).getData()).contains("last_update\n" +
- "Thu Jul 30 12:00:01");
- }
-
- @Test
- public void should_bind_null_value() throws Exception {
- //Given
- String queries = "@prepare[bind_null]=INSERT INTO zeppelin.users(login,firstname,lastname) VALUES(?,?,?)\n" +
- "@bind[bind_null]='bind_null',null,'NULL'\n" +
- "SELECT firstname,lastname FROM zeppelin.users WHERE login='bind_null';";
- //When
- final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\n" +
- "null\tNULL\n");
- }
-
- @Test
- public void should_bind_boolean_value() throws Exception {
- //Given
- String queries = "@prepare[bind_boolean]=INSERT INTO zeppelin.users(login,deceased) VALUES(?,?)\n" +
- "@bind[bind_boolean]='bind_bool',false\n" +
- "SELECT login,deceased FROM zeppelin.users WHERE login='bind_bool';";
- //When
- final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(actual.message().get(0).getData()).isEqualTo("login\tdeceased\n" +
- "bind_bool\tfalse\n");
- }
-
- @Test
- public void should_fail_when_executing_a_removed_prepared_statement() throws Exception {
- //Given
- String prepare_first = "@prepare[to_be_removed]=INSERT INTO zeppelin.users(login,deceased) VALUES(?,?)";
- interpreter.interpret(prepare_first, intrContext);
- String remove_prepared = "@remove_prepare[to_be_removed]\n" +
- "@bind[to_be_removed]='bind_bool'";
-
- //When
- final InterpreterResult actual = interpreter.interpret(remove_prepared, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.ERROR);
- assertThat(actual.message().get(0).getData()).isEqualTo("The statement 'to_be_removed' can not be bound to values. " +
- "Are you sure you did prepare it with @prepare[to_be_removed] ?");
- }
-
- @Test
- public void should_display_statistics_for_non_select_statement() throws Exception {
- //Given
- String query = "USE zeppelin;\nCREATE TABLE IF NOT EXISTS no_select(id int PRIMARY KEY);";
- final String rawResult = reformatHtml(readTestResource("/scalate/NoResultWithExecutionInfo.html"));
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
- final Cluster cluster = session.getCluster();
- final int port = cluster.getConfiguration().getProtocolOptions().getPort();
- final String address = cluster.getMetadata().getAllHosts().iterator().next()
- .getAddress().getHostAddress()
- .replaceAll("/", "").replaceAll("\\[", "").replaceAll("\\]","");
- //Then
- final String expected = rawResult.replaceAll("TRIED_HOSTS", address+":"+port)
- .replaceAll("QUERIED_HOSTS", address +":"+port);
-
-
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
- }
-
- @Test
- public void should_error_and_display_stack_trace() throws Exception {
- //Given
- String query = "@consistency=THREE\n" +
- "SELECT * FROM zeppelin.users LIMIT 3;";
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.ERROR);
- assertThat(actual.message().get(0).getData()).contains("All host(s) tried for query failed");
- }
-
- @Test
- public void should_describe_cluster() throws Exception {
- //Given
-
- String query = "DESCRIBE CLUSTER;";
- final String expected = reformatHtml(
- readTestResource("/scalate/DescribeCluster.html"));
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
- assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
- }
-
- @Test
- public void should_describe_keyspaces() throws Exception {
- //Given
- String query = "DESCRIBE KEYSPACES;";
- final String expected = reformatHtml(
- readTestResource("/scalate/DescribeKeyspaces.html"));
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
- assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
- }
-
- @Test
- public void should_describe_keyspace() throws Exception {
- //Given
- String query = "DESCRIBE KEYSPACE live_data;";
- final String expected = reformatHtml(
- readTestResource("/scalate/DescribeKeyspace_live_data.html"));
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
- assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
- }
-
- @Test
- @Ignore
- //TODO activate test when using Java 8 and C* 3.x
- public void should_describe_function() throws Exception {
- //Given
- Properties properties = new Properties();
- properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
- properties.setProperty(CASSANDRA_PORT, "9042");
- Interpreter interpreter = new CassandraInterpreter(properties);
- interpreter.open();
-
- String createFunction = "CREATE FUNCTION zeppelin.maxof(val1 int,val2 int) " +
- "RETURNS NULL ON NULL INPUT " +
- "RETURNS int " +
- "LANGUAGE java " +
- "AS $$" +
- " return Math.max(val1, val2);\n" +
- "$$;";
- interpreter.interpret(createFunction, intrContext);
- String query = "DESCRIBE FUNCTION zeppelin.maxOf;";
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(actual.message()).isEqualTo("xxxxx");
- }
-
- @Test
- @Ignore
- //TODO activate test when using Java 8 and C* 3.x
- public void should_describe_aggregate() throws Exception {
- //Given
- Properties properties = new Properties();
- properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
- properties.setProperty(CASSANDRA_PORT, "9042");
- Interpreter interpreter = new CassandraInterpreter(properties);
- interpreter.open();
-
- final String query = "DESCRIBE AGGREGATES;";
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
- }
-
- @Test
- @Ignore
- //TODO activate test when using Java 8 and C* 3.x
- public void should_describe_materialized_view() throws Exception {
- //Given
- Properties properties = new Properties();
- properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
- properties.setProperty(CASSANDRA_PORT, "9042");
- Interpreter interpreter = new CassandraInterpreter(properties);
- interpreter.open();
-
- final String query = "DESCRIBE MATERIALIZED VIEWS;";
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- }
-
- @Test
- public void should_describe_table() throws Exception {
- //Given
- String query = "DESCRIBE TABLE live_data.complex_table;";
- final String expected = reformatHtml(
- readTestResource("/scalate/DescribeTable_live_data_complex_table.html"));
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
- assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
- }
-
- @Test
- public void should_describe_udt() throws Exception {
- //Given
- String query = "DESCRIBE TYPE live_data.address;";
- final String expected = reformatHtml(
- readTestResource("/scalate/DescribeType_live_data_address.html"));
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
- assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
- }
-
- @Test
- public void should_describe_udt_withing_logged_in_keyspace() throws Exception {
- //Given
- String query = "USE live_data;\n" +
- "DESCRIBE TYPE address;";
- final String expected = reformatHtml(
- readTestResource("/scalate/DescribeType_live_data_address_within_current_keyspace.html"));
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
- assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
- }
-
- @Test
- public void should_error_describing_non_existing_table() throws Exception {
- //Given
- String query = "USE system;\n" +
- "DESCRIBE TABLE complex_table;";
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.ERROR);
- assertThat(actual.message().get(0).getData()).contains("Cannot find table system.complex_table");
- }
-
- @Test
- public void should_error_describing_non_existing_udt() throws Exception {
- //Given
- String query = "USE system;\n" +
- "DESCRIBE TYPE address;";
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.ERROR);
- assertThat(actual.message().get(0).getData()).contains("Cannot find type system.address");
- }
-
- @Test
- public void should_show_help() throws Exception {
- //Given
- String query = "HELP;";
- final String expected = reformatHtml(readTestResource("/scalate/Help.html"));
-
- //When
- final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
- //Then
- assertThat(actual.code()).isEqualTo(Code.SUCCESS);
- assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
- }
-
- private static String reformatHtml(String rawHtml) {
- return rawHtml
- .replaceAll("\\s*\n\\s*","")
- .replaceAll(">\\s+<", "><")
- .replaceAll("(?s)data-target=\"#[a-f0-9-]+(?:_asCQL|_indices_asCQL)?\"", "")
- .replaceAll("(?s)id=\"[a-f0-9-]+(?:_asCQL|_indices_asCQL)?\"", "")
- .trim();
- }
-
- private static String readTestResource(String testResource) {
- StringBuilder builder = new StringBuilder();
- InputStream stream = testResource.getClass().getResourceAsStream(testResource);
-
- try (BufferedReader br = new BufferedReader(new InputStreamReader(stream))) {
- String line;
- while ((line = br.readLine()) != null) {
- builder.append(line).append("\n");
- }
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
-
- return builder.toString();
- }
-}
\ No newline at end of file
+ @Test
+ public void should_throw_statement_not_having_semi_colon() throws Exception {
+ //Given
+ String statement = "SELECT * zeppelin.albums";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.ERROR);
+ assertThat(actual.message().get(0).getData())
+ .contains("Error parsing input:\n" +
+ "\t'SELECT * zeppelin.albums'\n" +
+ "Did you forget to add ; (semi-colon) at the end of each CQL statement ?");
+ }
+
+ @Test
+ public void should_validate_statement() throws Exception {
+ //Given
+ String statement = "SELECT * zeppelin.albums;";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.ERROR);
+ assertThat(actual.message().get(0).getData())
+ .contains("line 1:9 missing K_FROM at 'zeppelin' (SELECT * [zeppelin]....)");
+ }
+
+ @Test
+ public void should_execute_statement_with_consistency_option() throws Exception {
+ //Given
+ String statement = "@consistency=THREE\n" +
+ "SELECT * FROM zeppelin.artists LIMIT 1;";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.ERROR);
+ assertThat(actual.message().get(0).getData())
+ .contains("Not enough replicas available for query at consistency THREE (3 required " +
+ "but only 1 alive)");
+ }
+
+ @Test
+ public void should_execute_statement_with_serial_consistency_option() throws Exception {
+ //Given
+ String statement = "@serialConsistency=SERIAL\n" +
+ "SELECT * FROM zeppelin.artists LIMIT 1;";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ }
+
+ @Test
+ public void should_execute_statement_with_timestamp_option() throws Exception {
+ //Given
+ String statement1 = "INSERT INTO zeppelin.ts(key,val) VALUES('k','v1');";
+ String statement2 = "@timestamp=15\n" +
+ "INSERT INTO zeppelin.ts(key,val) VALUES('k','v2');";
+
+ // Insert v1 with current timestamp
+ interpreter.interpret(statement1, intrContext);
+
+ Thread.sleep(1);
+
+ //When
+ // Insert v2 with past timestamp
+ interpreter.interpret(statement2, intrContext);
+ final String actual = session.execute("SELECT * FROM zeppelin.ts LIMIT 1").one()
+ .getString("val");
+
+ //Then
+ assertThat(actual).isEqualTo("v1");
+ }
+
+ @Test
+ public void should_execute_statement_with_retry_policy() throws Exception {
+ //Given
+ String statement = "@retryPolicy=" + interpreter.LOGGING_DOWNGRADING_RETRY + "\n" +
+ "@consistency=THREE\n" +
+ "SELECT * FROM zeppelin.artists LIMIT 1;";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ }
+
+ @Test
+ public void should_execute_statement_with_request_timeout() throws Exception {
+ //Given
+ String statement = "@requestTimeOut=10000000\n" +
+ "SELECT * FROM zeppelin.artists;";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ }
+
+ @Test
+ public void should_execute_prepared_and_bound_statements() throws Exception {
+ //Given
+ String queries = "@prepare[ps]=INSERT INTO zeppelin.prepared(key,val) VALUES(?,?)\n" +
+ "@prepare[select]=SELECT * FROM zeppelin.prepared WHERE key=:key\n" +
+ "@bind[ps]='myKey','myValue'\n" +
+ "@bind[select]='myKey'";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(actual.message().get(0).getData()).isEqualTo("key\tval\n" +
+ "myKey\tmyValue\n");
+ }
+
+ @Test
+ public void should_execute_bound_statement() throws Exception {
+ //Given
+ String queries = "@prepare[users_insert]=INSERT INTO zeppelin.users" +
+ "(login,firstname,lastname,addresses,location)" +
+ "VALUES(:login,:fn,:ln,:addresses,:loc)\n" +
+ "@bind[users_insert]='jdoe','John','DOE'," +
+ "{street_number: 3, street_name: 'Beverly Hills Bld', zip_code: 90209," +
+ " country: 'USA', extra_info: ['Right on the hills','Next to the post box']," +
+ " phone_numbers: {'home': 2016778524, 'office': 2015790847}}," +
+ "('USA', 90209, 'Beverly Hills')\n" +
+ "SELECT * FROM zeppelin.users WHERE login='jdoe';";
+ //When
+ final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(actual.message().get(0).getData()).isEqualTo(
+ "login\taddresses\tage\tdeceased\tfirstname\tlast_update\tlastname\tlocation\n" +
+ "jdoe\t" +
+ "{street_number:3,street_name:'Beverly Hills Bld',zip_code:90209," +
+ "country:'USA',extra_info:['Right on the hills','Next to the post box']," +
+ "phone_numbers:{'office':2015790847,'home':2016778524}}\tnull\t" +
+ "null\t" +
+ "John\t" +
+ "null\t" +
+ "DOE\t" +
+ "('USA',90209,'Beverly Hills')\n");
+ }
+
+ @Test
+ public void should_exception_when_executing_unknown_bound_statement() throws Exception {
+ //Given
+ String queries = "@bind[select_users]='jdoe'";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.ERROR);
+ assertThat(actual.message().get(0).getData())
+ .isEqualTo("The statement 'select_users' can not be bound to values. " +
+ "Are you sure you did prepare it with @prepare[select_users] ?");
+ }
+
+ @Test
+ public void should_extract_variable_from_statement() throws Exception {
+ //Given
+ AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+ when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
+ when(intrContext.getGui().input("login", "hsue")).thenReturn("hsue");
+ when(intrContext.getGui().input("age", "27")).thenReturn("27");
+
+ String queries = "@prepare[test_insert_with_variable]=" +
+ "INSERT INTO zeppelin.users(login,firstname,lastname,age) VALUES(?,?,?,?)\n" +
+ "@bind[test_insert_with_variable]='{{login=hsue}}','Helen','SUE',{{age=27}}\n" +
+ "SELECT firstname,lastname,age FROM zeppelin.users WHERE login='hsue';";
+ //When
+ final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\tage\n" +
+ "Helen\tSUE\t27\n");
+ }
+
+ @Test
+ public void should_just_prepare_statement() throws Exception {
+ //Given
+ String queries = "@prepare[just_prepare]=SELECT name,country,styles " +
+ "FROM zeppelin.artists LIMIT 3";
+ final String expected = reformatHtml(
+ readTestResource("/scalate/NoResult.html"));
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+ }
+
+ @Test
+ public void should_execute_bound_statement_with_no_bound_value() throws Exception {
+ //Given
+ String queries = "@prepare[select_no_bound_value]=SELECT name,country,styles " +
+ "FROM zeppelin.artists LIMIT 3\n" +
+ "@bind[select_no_bound_value]";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(actual.message().get(0).getData()).isEqualTo("name\tcountry\tstyles\n" +
+ "Bogdan Raczynski\tPoland\t[Dance, Electro]\n" +
+ "Krishna Das\tUSA\t[Unknown]\n" +
+ "Sheryl Crow\tUSA\t[Classic, Rock, Country, Blues, Pop, Folk]\n");
+ }
+
+ @Test
+ public void should_parse_date_value() throws Exception {
+ //Given
+ String queries = "@prepare[parse_date]=INSERT INTO zeppelin.users(login,last_update) " +
+ "VALUES(?,?)\n" +
+ "@bind[parse_date]='last_update','2015-07-30 12:00:01'\n" +
+ "SELECT last_update FROM zeppelin.users WHERE login='last_update';";
+ //When
+ final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(actual.message().get(0).getData()).contains("last_update\n" +
+ "Thu Jul 30 12:00:01");
+ }
+
+ @Test
+ public void should_bind_null_value() throws Exception {
+ //Given
+ String queries = "@prepare[bind_null]=INSERT INTO zeppelin.users(login,firstname,lastname) " +
+ "VALUES(?,?,?)\n" +
+ "@bind[bind_null]='bind_null',null,'NULL'\n" +
+ "SELECT firstname,lastname FROM zeppelin.users WHERE login='bind_null';";
+ //When
+ final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\n" +
+ "null\tNULL\n");
+ }
+
+ @Test
+ public void should_bind_boolean_value() throws Exception {
+ //Given
+ String queries = "@prepare[bind_boolean]=INSERT INTO zeppelin.users(login,deceased) " +
+ "VALUES(?,?)\n" +
+ "@bind[bind_boolean]='bind_bool',false\n" +
+ "SELECT login,deceased FROM zeppelin.users WHERE login='bind_bool';";
+ //When
+ final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(actual.message().get(0).getData()).isEqualTo("login\tdeceased\n" +
+ "bind_bool\tfalse\n");
+ }
+
+ @Test
+ public void should_fail_when_executing_a_removed_prepared_statement() throws Exception {
+ //Given
+ String prepareFirst = "@prepare[to_be_removed]=INSERT INTO zeppelin.users(login,deceased) " +
+ "VALUES(?,?)";
+ interpreter.interpret(prepareFirst, intrContext);
+ String removePrepared = "@remove_prepare[to_be_removed]\n" +
+ "@bind[to_be_removed]='bind_bool'";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(removePrepared, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.ERROR);
+ assertThat(actual.message().get(0).getData()).isEqualTo("The statement 'to_be_removed' can " +
+ "not be bound to values. Are you sure you did prepare it with " +
+ "@prepare[to_be_removed] ?");
+ }
+
+ @Test
+ public void should_display_statistics_for_non_select_statement() throws Exception {
+ //Given
+ String query = "USE zeppelin;\nCREATE TABLE IF NOT EXISTS no_select(id int PRIMARY KEY);";
+ final String rawResult = reformatHtml(readTestResource(
+ "/scalate/NoResultWithExecutionInfo.html"));
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+ final Cluster cluster = session.getCluster();
+ final int port = cluster.getConfiguration().getProtocolOptions().getPort();
+ final String address = cluster.getMetadata().getAllHosts().iterator().next()
+ .getAddress().getHostAddress()
+ .replaceAll("/", "").replaceAll("\\[", "").replaceAll("\\]", "");
+ //Then
+ final String expected = rawResult.replaceAll("TRIED_HOSTS", address + ":" + port)
+ .replaceAll("QUERIED_HOSTS", address + ":" + port);
+
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+ }
+
+ @Test
+ public void should_error_and_display_stack_trace() throws Exception {
+ //Given
+ String query = "@consistency=THREE\n" +
+ "SELECT * FROM zeppelin.users LIMIT 3;";
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.ERROR);
+ assertThat(actual.message().get(0).getData()).contains("All host(s) tried for query failed");
+ }
+
+ @Test
+ public void should_describe_cluster() throws Exception {
+ //Given
+
+ String query = "DESCRIBE CLUSTER;";
+ final String expected = reformatHtml(
+ readTestResource("/scalate/DescribeCluster.html"));
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+ }
+
+ @Test
+ public void should_describe_keyspaces() throws Exception {
+ //Given
+ String query = "DESCRIBE KEYSPACES;";
+ final String expected = reformatHtml(
+ readTestResource("/scalate/DescribeKeyspaces.html"));
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+ }
+
+ @Test
+ public void should_describe_keyspace() throws Exception {
+ //Given
+ String query = "DESCRIBE KEYSPACE live_data;";
+ final String expected = reformatHtml(
+ readTestResource("/scalate/DescribeKeyspace_live_data.html"));
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+ }
+
+ @Test
+ @Ignore
+ //TODO(n.a.) activate test when using Java 8 and C* 3.x
+ public void should_describe_function() throws Exception {
+ //Given
+ Properties properties = new Properties();
+ properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
+ properties.setProperty(CASSANDRA_PORT, "9042");
+ Interpreter interpreter = new CassandraInterpreter(properties);
+ interpreter.open();
+
+ String createFunction = "CREATE FUNCTION zeppelin.maxof(val1 int,val2 int) " +
+ "RETURNS NULL ON NULL INPUT " +
+ "RETURNS int " +
+ "LANGUAGE java " +
+ "AS $$" +
+ " return Math.max(val1, val2);\n" +
+ "$$;";
+ interpreter.interpret(createFunction, intrContext);
+ String query = "DESCRIBE FUNCTION zeppelin.maxOf;";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(actual.message()).isEqualTo("xxxxx");
+ }
+
+ @Test
+ @Ignore
+ //TODO(n.a.) activate test when using Java 8 and C* 3.x
+ public void should_describe_aggregate() throws Exception {
+ //Given
+ Properties properties = new Properties();
+ properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
+ properties.setProperty(CASSANDRA_PORT, "9042");
+ Interpreter interpreter = new CassandraInterpreter(properties);
+ interpreter.open();
+
+ final String query = "DESCRIBE AGGREGATES;";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ }
+
+ @Test
+ @Ignore
+ //TODO(n.a.) activate test when using Java 8 and C* 3.x
+ public void should_describe_materialized_view() throws Exception {
+ //Given
+ Properties properties = new Properties();
+ properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
+ properties.setProperty(CASSANDRA_PORT, "9042");
+ Interpreter interpreter = new CassandraInterpreter(properties);
+ interpreter.open();
+
+ final String query = "DESCRIBE MATERIALIZED VIEWS;";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ }
+
+ @Test
+ public void should_describe_table() throws Exception {
+ //Given
+ String query = "DESCRIBE TABLE live_data.complex_table;";
+ final String expected = reformatHtml(
+ readTestResource("/scalate/DescribeTable_live_data_complex_table.html"));
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+ }
+
+ @Test
+ public void should_describe_udt() throws Exception {
+ //Given
+ String query = "DESCRIBE TYPE live_data.address;";
+ final String expected = reformatHtml(
+ readTestResource("/scalate/DescribeType_live_data_address.html"));
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+ }
+
+ @Test
+ public void should_describe_udt_withing_logged_in_keyspace() throws Exception {
+ //Given
+ String query = "USE live_data;\n" +
+ "DESCRIBE TYPE address;";
+ final String expected = reformatHtml(readTestResource(
+ "/scalate/DescribeType_live_data_address_within_current_keyspace.html"));
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+ }
+
+ @Test
+ public void should_error_describing_non_existing_table() throws Exception {
+ //Given
+ String query = "USE system;\n" +
+ "DESCRIBE TABLE complex_table;";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.ERROR);
+ assertThat(actual.message().get(0).getData())
+ .contains("Cannot find table system.complex_table");
+ }
+
+ @Test
+ public void should_error_describing_non_existing_udt() throws Exception {
+ //Given
+ String query = "USE system;\n" +
+ "DESCRIBE TYPE address;";
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.ERROR);
+ assertThat(actual.message().get(0).getData()).contains("Cannot find type system.address");
+ }
+
+ @Test
+ public void should_show_help() throws Exception {
+ //Given
+ String query = "HELP;";
+ final String expected = reformatHtml(readTestResource("/scalate/Help.html"));
+
+ //When
+ final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+ //Then
+ assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+ assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+ }
+
+ private static String reformatHtml(String rawHtml) {
+ return rawHtml
+ .replaceAll("\\s*\n\\s*", "")
+ .replaceAll(">\\s+<", "><")
+ .replaceAll("(?s)data-target=\"#[a-f0-9-]+(?:_asCQL|_indices_asCQL)?\"", "")
+ .replaceAll("(?s)id=\"[a-f0-9-]+(?:_asCQL|_indices_asCQL)?\"", "")
+ .trim();
+ }
+
+ private static String readTestResource(String testResource) {
+ StringBuilder builder = new StringBuilder();
+ InputStream stream = testResource.getClass().getResourceAsStream(testResource);
+
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(stream))) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ builder.append(line).append("\n");
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+ return builder.toString();
+ }
+}