You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/02/21 04:07:44 UTC

[1/2] zeppelin git commit: ZEPPELIN-3151. Fixed Checkstyle errors and warnings in cassandra module

Repository: zeppelin
Updated Branches:
  refs/heads/master b335caed3 -> 8bb888b49


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
index f3848fd..e096a0c 100644
--- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
+++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/InterpreterLogicTest.java
@@ -16,27 +16,23 @@
  */
 package org.apache.zeppelin.cassandra;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import static java.util.Arrays.asList;
+
 import static com.datastax.driver.core.BatchStatement.Type.UNLOGGED;
 import static com.datastax.driver.core.ConsistencyLevel.ALL;
 import static com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL;
 import static com.datastax.driver.core.ConsistencyLevel.ONE;
 import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
 import static com.datastax.driver.core.ConsistencyLevel.SERIAL;
-import static java.util.Arrays.asList;
-import static org.assertj.core.api.Assertions.*;
-import static org.mockito.Mockito.*;
 
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.Statement;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.display.ui.OptionInput.ParamOption;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -47,302 +43,334 @@ import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
-import scala.Option;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.List;
 
-import org.apache.zeppelin.cassandra.TextBlockHierarchy.*;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+
+import scala.Option;
+
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.AnyBlock;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.Consistency;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.DowngradingRetryPolicy$;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.LoggingDefaultRetryPolicy$;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.QueryParameters;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.RequestTimeOut;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.RetryPolicy;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.SerialConsistency;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.SimpleStm;
+import org.apache.zeppelin.cassandra.TextBlockHierarchy.Timestamp;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.display.ui.OptionInput.ParamOption;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
 
 @RunWith(MockitoJUnitRunner.class)
 public class InterpreterLogicTest {
-
-    @Rule
-    public ExpectedException expectedException = ExpectedException.none();
-
-    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-    private InterpreterContext intrContext;
-
-    @Mock
-    private Session session;
-
-    final InterpreterLogic helper = new InterpreterLogic(session);
-
-    @Captor
-    ArgumentCaptor<ParamOption[]> optionsCaptor;
-
-    @Test
-    public void should_parse_input_string_block() throws Exception {
-        //Given
-        String input = "SELECT * FROM users LIMIT 10;";
-
-        //When
-        final List<AnyBlock> anyBlocks = this.<AnyBlock>toJavaList(helper.parseInput(input));
-
-        //Then
-        assertThat(anyBlocks).hasSize(1);
-        assertThat(anyBlocks.get(0)).isInstanceOf(SimpleStm.class);
-    }
-
-    @Test
-    public void should_exception_while_parsing_input() throws Exception {
-        //Given
-        String input = "SELECT * FROM users LIMIT 10";
-
-        //When
-        expectedException.expect(InterpreterException.class);
-        expectedException.expectMessage("Error parsing input:\n" +
-                "\t'SELECT * FROM users LIMIT 10'\n" +
-                "Did you forget to add ; (semi-colon) at the end of each CQL statement ?");
-
-        helper.parseInput(input);
-    }
-
-    @Test
-    public void should_extract_variable_and_default_value() throws Exception {
-        //Given
-        AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
-        when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
-        when(intrContext.getGui().input("table", "zeppelin.demo")).thenReturn("zeppelin.demo");
-        when(intrContext.getGui().input("id", "'John'")).thenReturn("'John'");
-
-        //When
-        final String actual = helper.maybeExtractVariables("SELECT * FROM {{table=zeppelin.demo}} WHERE id={{id='John'}}", intrContext);
-
-        //Then
-        assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo WHERE id='John'");
-    }
-
-    @Test
-    public void should_extract_variable_and_choices() throws Exception {
-        //Given
-        AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
-        when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
-        when(intrContext.getGui().select(eq("name"), eq("'Paul'"), optionsCaptor.capture())).thenReturn("'Jack'");
-
-        //When
-        final String actual = helper.maybeExtractVariables("SELECT * FROM zeppelin.artists WHERE name={{name='Paul'|'Jack'|'Smith'}}", intrContext);
-
-        //Then
-        assertThat(actual).isEqualTo("SELECT * FROM zeppelin.artists WHERE name='Jack'");
-        final List<ParamOption> paramOptions = asList(optionsCaptor.getValue());
-        assertThat(paramOptions.get(0).getValue()).isEqualTo("'Paul'");
-        assertThat(paramOptions.get(1).getValue()).isEqualTo("'Jack'");
-        assertThat(paramOptions.get(2).getValue()).isEqualTo("'Smith'");
-    }
-
-    @Test
-    public void should_extract_no_variable() throws Exception {
-        //Given
-        GUI gui = mock(GUI.class);
-        when(intrContext.getGui()).thenReturn(gui);
-
-        //When
-        final String actual = helper.maybeExtractVariables("SELECT * FROM zeppelin.demo", intrContext);
-
-        //Then
-        verifyZeroInteractions(gui);
-        assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo");
-    }
-
-    @Test
-    public void should_extract_variable_from_angular_object_registry() throws Exception {
-        //Given
-        AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
-        angularObjectRegistry.add("id", "from_angular_registry", "noteId", "paragraphId");
-        when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
-        when(intrContext.getNoteId()).thenReturn("noteId");
-        when(intrContext.getParagraphId()).thenReturn("paragraphId");
-
-        //When
-        final String actual = helper.maybeExtractVariables("SELECT * FROM zeppelin.demo WHERE id='{{id=John}}'", intrContext);
-
-        //Then
-        assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo WHERE id='from_angular_registry'");
-        verify(intrContext, never()).getGui();
-    }
-
-    @Test
-    public void should_error_if_incorrect_variable_definition() throws Exception {
-        //Given
-
-        //When
-        expectedException.expect(ParsingException.class);
-        expectedException.expectMessage("Invalid bound variable definition for '{{table?zeppelin.demo}}' in 'SELECT * FROM {{table?zeppelin.demo}} WHERE id={{id='John'}}'. It should be of form 'variable=defaultValue'");
-
-        //Then
-        helper.maybeExtractVariables("SELECT * FROM {{table?zeppelin.demo}} WHERE id={{id='John'}}", intrContext);
-    }
-
-
-    @Test
-    public void should_extract_consistency_option() throws Exception {
-        //Given
-        List<QueryParameters> options = Arrays.<QueryParameters>asList(new Consistency(ALL), new Consistency(ONE));
-
-        //When
-        final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
-        //Then
-        assertThat(actual.consistency().get()).isEqualTo(ALL);
-    }
-
-
-    @Test
-    public void should_extract_serial_consistency_option() throws Exception {
-        //Given
-        List<QueryParameters> options = Arrays.<QueryParameters>asList(new SerialConsistency(SERIAL), new SerialConsistency(LOCAL_SERIAL));
-
-        //When
-        final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
-        //Then
-        assertThat(actual.serialConsistency().get()).isEqualTo(SERIAL);
-    }
-
-    @Test
-    public void should_extract_timestamp_option() throws Exception {
-        //Given
-        List<QueryParameters> options = Arrays.<QueryParameters>asList(new Timestamp(123L), new Timestamp(456L));
-
-        //When
-        final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
-        //Then
-        assertThat(actual.timestamp().get()).isEqualTo(123L);
-    }
-
-    @Test
-    public void should_extract_retry_policy_option() throws Exception {
-        //Given
-        List<QueryParameters> options = Arrays.<QueryParameters>asList(DowngradingRetryPolicy$.MODULE$, LoggingDefaultRetryPolicy$.MODULE$);
-
-        //When
-        final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
-        //Then
-        assertThat(actual.retryPolicy().get()).isSameAs(DowngradingRetryPolicy$.MODULE$);
-    }
-
-    @Test
-    public void should_extract_request_timeout_option() throws Exception {
-        //Given
-        List<QueryParameters> options = Arrays.<QueryParameters>asList(new RequestTimeOut(100));
-
-        //When
-        final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
-
-        //Then
-        assertThat(actual.requestTimeOut().get()).isEqualTo(100);
-    }
-
-    @Test
-    public void should_generate_simple_statement() throws Exception {
-        //Given
-        String input = "SELECT * FROM users LIMIT 10;";
-        CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
-                Option.<ConsistencyLevel>empty(),
-                Option.empty(),
-                Option.<RetryPolicy>empty(),
-                Option.empty(),
-                Option.empty());
-
-        //When
-        final SimpleStatement actual = helper.generateSimpleStatement(new SimpleStm(input), options, intrContext);
-
-        //Then
-        assertThat(actual).isNotNull();
-        assertThat(actual.getQueryString()).isEqualTo("SELECT * FROM users LIMIT 10;");
-        assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM);
-    }
-
-    @Test
-    public void should_generate_batch_statement() throws Exception {
-        //Given
-        Statement st1 = new SimpleStatement("SELECT * FROM users LIMIT 10;");
-        Statement st2 = new SimpleStatement("INSERT INTO users(id) VALUES(10);");
-        Statement st3 = new SimpleStatement("UPDATE users SET name = 'John DOE' WHERE id=10;");
-        CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
-                Option.<ConsistencyLevel>empty(),
-                Option.empty(),
-                Option.<RetryPolicy>empty(),
-                Option.empty(),
-                Option.empty());
-
-        //When
-        BatchStatement actual = helper.generateBatchStatement(UNLOGGED, options, toScalaList(asList(st1, st2, st3)));
-
-        //Then
-        assertThat(actual).isNotNull();
-        final List<Statement> statements = new ArrayList<>(actual.getStatements());
-        assertThat(statements).hasSize(3);
-        assertThat(statements.get(0)).isSameAs(st1);
-        assertThat(statements.get(1)).isSameAs(st2);
-        assertThat(statements.get(2)).isSameAs(st3);
-        assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM);
-    }
-
-    @Test
-    public void should_parse_bound_values() throws Exception {
-        //Given
-        String bs="'jdoe',32,'John DOE',null, true, '2014-06-12 34:00:34'";
-
-        //When
-        final List<String> actual = this.<String>toJavaList(helper.parseBoundValues("ps", bs));
-
-        //Then
-        assertThat(actual).containsExactly("'jdoe'", "32", "'John DOE'",
-                "null", "true", "2014-06-12 34:00:34");
-    }
-
-    @Test
-    public void should_parse_simple_date() throws Exception {
-        //Given
-        String dateString = "2015-07-30 12:00:01";
-
-        //When
-        final Date actual = helper.parseDate(dateString);
-
-        //Then
-        Calendar calendar = Calendar.getInstance();
-        calendar.setTime(actual);
-
-        assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
-        assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
-        assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
-        assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
-        assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
-        assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
-    }
-
-    @Test
-    public void should_parse_accurate_date() throws Exception {
-        //Given
-        String dateString = "2015-07-30 12:00:01.123";
-
-        //When
-        final Date actual = helper.parseDate(dateString);
-
-        //Then
-        Calendar calendar = Calendar.getInstance();
-        calendar.setTime(actual);
-
-        assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
-        assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
-        assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
-        assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
-        assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
-        assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
-        assertThat(calendar.get(Calendar.MILLISECOND)).isEqualTo(123);
-    }
-
-    private <A> scala.collection.immutable.List<A> toScalaList(java.util.List<A> list)  {
-        return scala.collection.JavaConversions.collectionAsScalaIterable(list).toList();
-    }
-
-    private  <A> java.util.List<A> toJavaList(scala.collection.immutable.List<A> list){
-        return scala.collection.JavaConversions.seqAsJavaList(list);
-    }
-}
\ No newline at end of file
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+  private InterpreterContext intrContext;
+
+  @Mock
+  private Session session;
+
+  final InterpreterLogic helper = new InterpreterLogic(session);
+
+  @Captor
+  ArgumentCaptor<ParamOption[]> optionsCaptor;
+
+  @Test
+  public void should_parse_input_string_block() throws Exception {
+    //Given
+    String input = "SELECT * FROM users LIMIT 10;";
+
+    //When
+    final List<AnyBlock> anyBlocks = this.<AnyBlock>toJavaList(helper.parseInput(input));
+
+    //Then
+    assertThat(anyBlocks).hasSize(1);
+    assertThat(anyBlocks.get(0)).isInstanceOf(SimpleStm.class);
+  }
+
+  @Test
+  public void should_exception_while_parsing_input() throws Exception {
+    //Given
+    String input = "SELECT * FROM users LIMIT 10";
+
+    //When
+    expectedException.expect(InterpreterException.class);
+    expectedException.expectMessage("Error parsing input:\n" +
+            "\t'SELECT * FROM users LIMIT 10'\n" +
+            "Did you forget to add ; (semi-colon) at the end of each CQL statement ?");
+
+    helper.parseInput(input);
+  }
+
+  @Test
+  public void should_extract_variable_and_default_value() throws Exception {
+    //Given
+    AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+    when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
+    when(intrContext.getGui().input("table", "zeppelin.demo")).thenReturn("zeppelin.demo");
+    when(intrContext.getGui().input("id", "'John'")).thenReturn("'John'");
+
+    //When
+    final String actual = helper.maybeExtractVariables(
+            "SELECT * FROM {{table=zeppelin.demo}} WHERE id={{id='John'}}", intrContext);
+
+    //Then
+    assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo WHERE id='John'");
+  }
+
+  @Test
+  public void should_extract_variable_and_choices() throws Exception {
+    //Given
+    AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+    when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
+    when(intrContext.getGui().select(eq("name"), eq("'Paul'"), optionsCaptor.capture()))
+            .thenReturn("'Jack'");
+
+    //When
+    final String actual = helper.maybeExtractVariables(
+            "SELECT * FROM zeppelin.artists WHERE name={{name='Paul'|'Jack'|'Smith'}}",
+            intrContext);
+
+    //Then
+    assertThat(actual).isEqualTo("SELECT * FROM zeppelin.artists WHERE name='Jack'");
+    final List<ParamOption> paramOptions = asList(optionsCaptor.getValue());
+    assertThat(paramOptions.get(0).getValue()).isEqualTo("'Paul'");
+    assertThat(paramOptions.get(1).getValue()).isEqualTo("'Jack'");
+    assertThat(paramOptions.get(2).getValue()).isEqualTo("'Smith'");
+  }
+
+  @Test
+  public void should_extract_no_variable() throws Exception {
+    //Given
+    GUI gui = mock(GUI.class);
+    when(intrContext.getGui()).thenReturn(gui);
+
+    //When
+    final String actual = helper.maybeExtractVariables("SELECT * FROM zeppelin.demo", intrContext);
+
+    //Then
+    verifyZeroInteractions(gui);
+    assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo");
+  }
+
+  @Test
+  public void should_extract_variable_from_angular_object_registry() throws Exception {
+    //Given
+    AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+    angularObjectRegistry.add("id", "from_angular_registry", "noteId", "paragraphId");
+    when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
+    when(intrContext.getNoteId()).thenReturn("noteId");
+    when(intrContext.getParagraphId()).thenReturn("paragraphId");
+
+    //When
+    final String actual = helper.maybeExtractVariables(
+            "SELECT * FROM zeppelin.demo WHERE id='{{id=John}}'", intrContext);
+
+    //Then
+    assertThat(actual).isEqualTo("SELECT * FROM zeppelin.demo WHERE id='from_angular_registry'");
+    verify(intrContext, never()).getGui();
+  }
+
+  @Test
+  public void should_error_if_incorrect_variable_definition() throws Exception {
+    //Given
+
+    //When
+    expectedException.expect(ParsingException.class);
+    expectedException.expectMessage("Invalid bound variable definition for " +
+            "'{{table?zeppelin.demo}}' in 'SELECT * FROM {{table?zeppelin.demo}} " +
+            "WHERE id={{id='John'}}'. It should be of form 'variable=defaultValue'");
+
+    //Then
+    helper.maybeExtractVariables("SELECT * FROM {{table?zeppelin.demo}} WHERE id={{id='John'}}",
+            intrContext);
+  }
+
+  @Test
+  public void should_extract_consistency_option() throws Exception {
+    //Given
+    List<QueryParameters> options = Arrays.<QueryParameters>asList(new Consistency(ALL),
+            new Consistency(ONE));
+
+    //When
+    final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
+
+    //Then
+    assertThat(actual.consistency().get()).isEqualTo(ALL);
+  }
+
+  @Test
+  public void should_extract_serial_consistency_option() throws Exception {
+    //Given
+    List<QueryParameters> options = Arrays.<QueryParameters>asList(new SerialConsistency(SERIAL),
+            new SerialConsistency(LOCAL_SERIAL));
+
+    //When
+    final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
+
+    //Then
+    assertThat(actual.serialConsistency().get()).isEqualTo(SERIAL);
+  }
+
+  @Test
+  public void should_extract_timestamp_option() throws Exception {
+    //Given
+    List<QueryParameters> options = Arrays.<QueryParameters>asList(new Timestamp(123L),
+            new Timestamp(456L));
+
+    //When
+    final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
+
+    //Then
+    assertThat(actual.timestamp().get()).isEqualTo(123L);
+  }
+
+  @Test
+  public void should_extract_retry_policy_option() throws Exception {
+    //Given
+    List<QueryParameters> options = Arrays.<QueryParameters>asList(DowngradingRetryPolicy$.MODULE$,
+            LoggingDefaultRetryPolicy$.MODULE$);
+
+    //When
+    final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
+
+    //Then
+    assertThat(actual.retryPolicy().get()).isSameAs(DowngradingRetryPolicy$.MODULE$);
+  }
+
+  @Test
+  public void should_extract_request_timeout_option() throws Exception {
+    //Given
+    List<QueryParameters> options = Arrays.<QueryParameters>asList(new RequestTimeOut(100));
+
+    //When
+    final CassandraQueryOptions actual = helper.extractQueryOptions(toScalaList(options));
+
+    //Then
+    assertThat(actual.requestTimeOut().get()).isEqualTo(100);
+  }
+
+  @Test
+  public void should_generate_simple_statement() throws Exception {
+    //Given
+    String input = "SELECT * FROM users LIMIT 10;";
+    CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
+            Option.<ConsistencyLevel>empty(),
+            Option.empty(),
+            Option.<RetryPolicy>empty(),
+            Option.empty(),
+            Option.empty());
+
+    //When
+    final SimpleStatement actual = helper.generateSimpleStatement(new SimpleStm(input), options,
+            intrContext);
+
+    //Then
+    assertThat(actual).isNotNull();
+    assertThat(actual.getQueryString()).isEqualTo("SELECT * FROM users LIMIT 10;");
+    assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM);
+  }
+
+  @Test
+  public void should_generate_batch_statement() throws Exception {
+    //Given
+    Statement st1 = new SimpleStatement("SELECT * FROM users LIMIT 10;");
+    Statement st2 = new SimpleStatement("INSERT INTO users(id) VALUES(10);");
+    Statement st3 = new SimpleStatement("UPDATE users SET name = 'John DOE' WHERE id=10;");
+    CassandraQueryOptions options = new CassandraQueryOptions(Option.apply(QUORUM),
+            Option.<ConsistencyLevel>empty(),
+            Option.empty(),
+            Option.<RetryPolicy>empty(),
+            Option.empty(),
+            Option.empty());
+
+    //When
+    BatchStatement actual = helper.generateBatchStatement(UNLOGGED, options,
+            toScalaList(asList(st1, st2, st3)));
+
+    //Then
+    assertThat(actual).isNotNull();
+    final List<Statement> statements = new ArrayList<>(actual.getStatements());
+    assertThat(statements).hasSize(3);
+    assertThat(statements.get(0)).isSameAs(st1);
+    assertThat(statements.get(1)).isSameAs(st2);
+    assertThat(statements.get(2)).isSameAs(st3);
+    assertThat(actual.getConsistencyLevel()).isSameAs(QUORUM);
+  }
+
+  @Test
+  public void should_parse_bound_values() throws Exception {
+    //Given
+    String bs = "'jdoe',32,'John DOE',null, true, '2014-06-12 34:00:34'";
+
+    //When
+    final List<String> actual = this.<String>toJavaList(helper.parseBoundValues("ps", bs));
+
+    //Then
+    assertThat(actual).containsExactly("'jdoe'", "32", "'John DOE'",
+            "null", "true", "2014-06-12 34:00:34");
+  }
+
+  @Test
+  public void should_parse_simple_date() throws Exception {
+    //Given
+    String dateString = "2015-07-30 12:00:01";
+
+    //When
+    final Date actual = helper.parseDate(dateString);
+
+    //Then
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTime(actual);
+
+    assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
+    assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
+    assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
+    assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
+    assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
+    assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
+  }
+
+  @Test
+  public void should_parse_accurate_date() throws Exception {
+    //Given
+    String dateString = "2015-07-30 12:00:01.123";
+
+    //When
+    final Date actual = helper.parseDate(dateString);
+
+    //Then
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTime(actual);
+
+    assertThat(calendar.get(Calendar.YEAR)).isEqualTo(2015);
+    assertThat(calendar.get(Calendar.MONTH)).isEqualTo(Calendar.JULY);
+    assertThat(calendar.get(Calendar.DAY_OF_MONTH)).isEqualTo(30);
+    assertThat(calendar.get(Calendar.HOUR_OF_DAY)).isEqualTo(12);
+    assertThat(calendar.get(Calendar.MINUTE)).isEqualTo(0);
+    assertThat(calendar.get(Calendar.SECOND)).isEqualTo(1);
+    assertThat(calendar.get(Calendar.MILLISECOND)).isEqualTo(123);
+  }
+
+  private <A> scala.collection.immutable.List<A> toScalaList(java.util.List<A> list)  {
+    return scala.collection.JavaConversions.collectionAsScalaIterable(list).toList();
+  }
+
+  private  <A> java.util.List<A> toJavaList(scala.collection.immutable.List<A> list){
+    return scala.collection.JavaConversions.seqAsJavaList(list);
+  }
+}


[2/2] zeppelin git commit: ZEPPELIN-3151. Fixed Checkstyle errors and warnings in cassandra module

Posted by zj...@apache.org.
ZEPPELIN-3151. Fixed Checkstyle errors and warnings in cassandra module

### What is this PR for?
Fixed the Checkstyle errors and warnings in the cassandra module.

### What type of PR is it?
Improvement

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3151

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Jan Hentschel <ja...@ultratendency.com>

Closes #2785 from HorizonNet/ZEPPELIN-3151 and squashes the following commits:

646c410 [Jan Hentschel] ZEPPELIN-3151. Fixed Checkstyle errors and warnings in cassandra module


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/8bb888b4
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/8bb888b4
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/8bb888b4

Branch: refs/heads/master
Commit: 8bb888b494b9f611a4e5d81bd8451626eadd7e12
Parents: b335cae
Author: Jan Hentschel <ja...@ultratendency.com>
Authored: Sat Feb 10 12:05:20 2018 +0100
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Feb 21 12:07:38 2018 +0800

----------------------------------------------------------------------
 cassandra/pom.xml                               |    7 +
 .../cassandra/CassandraInterpreter.java         |   48 +-
 .../zeppelin/cassandra/ParsingException.java    |    2 +-
 .../zeppelin/cassandra/JavaDriverConfig.scala   |   80 +-
 .../cassandra/CassandraInterpreterTest.java     | 1463 +++++++++---------
 .../cassandra/InterpreterLogicTest.java         |  636 ++++----
 6 files changed, 1157 insertions(+), 1079 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/cassandra/pom.xml b/cassandra/pom.xml
index a07ed82..c5b08f7 100644
--- a/cassandra/pom.xml
+++ b/cassandra/pom.xml
@@ -249,6 +249,13 @@
             <plugin>
                 <artifactId>maven-resources-plugin</artifactId>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <configuration>
+                    <skip>false</skip>
+                </configuration>
+            </plugin>
         </plugins>
 
     </build>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
index 0f986be..105e271 100644
--- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
+++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/CassandraInterpreter.java
@@ -16,21 +16,11 @@
  */
 package org.apache.zeppelin.cassandra;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.JdkSSLOptions;
-import com.datastax.driver.core.ProtocolOptions.Compression;
-import com.datastax.driver.core.Session;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
+import static java.lang.Integer.parseInt;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -39,10 +29,23 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
-import static java.lang.Integer.parseInt;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.JdkSSLOptions;
+import com.datastax.driver.core.ProtocolOptions.Compression;
+import com.datastax.driver.core.Session;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
 
 /**
- * Interpreter for Apache Cassandra CQL query language
+ * Interpreter for Apache Cassandra CQL query language.
  */
 public class CassandraInterpreter extends Interpreter {
 
@@ -128,14 +131,14 @@ public class CassandraInterpreter extends Interpreter {
   public static final String DEFAULT_CREDENTIAL = "none";
   public static final String DEFAULT_POLICY = "DEFAULT";
   public static final String DEFAULT_PARALLELISM = "10";
-  static String DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100";
-  static String DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "100";
-  static String DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2";
-  static String DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1";
-  static String DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8";
-  static String DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2";
-  static String DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024";
-  static String DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256";
+  static String defaultNewConnectionThresholdLocal = "100";
+  static String defaultNewConnectionThresholdRemote = "100";
+  static String defaultCoreConnectionPerHostLocal = "2";
+  static String defaultCoreConnectionPerHostRemote = "1";
+  static String defaultMaxConnectionPerHostLocal = "8";
+  static String defaultMaxConnectionPerHostRemote = "2";
+  static String defaultMaxRequestPerConnectionLocal = "1024";
+  static String defaultMaxRequestPerConnectionRemote = "256";
   public static final String DEFAULT_IDLE_TIMEOUT = "120";
   public static final String DEFAULT_POOL_TIMEOUT = "5000";
   public static final String DEFAULT_HEARTBEAT_INTERVAL = "30";
@@ -244,7 +247,6 @@ public class CassandraInterpreter extends Interpreter {
 
   @Override
   public void cancel(InterpreterContext context) {
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/src/main/java/org/apache/zeppelin/cassandra/ParsingException.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/zeppelin/cassandra/ParsingException.java b/cassandra/src/main/java/org/apache/zeppelin/cassandra/ParsingException.java
index da9bb0c..b87e8b2 100644
--- a/cassandra/src/main/java/org/apache/zeppelin/cassandra/ParsingException.java
+++ b/cassandra/src/main/java/org/apache/zeppelin/cassandra/ParsingException.java
@@ -17,7 +17,7 @@
 package org.apache.zeppelin.cassandra;
 
 /**
- * Parsing Exception for Cassandra CQL statement
+ * Parsing Exception for Cassandra CQL statement.
  */
 public class ParsingException extends RuntimeException{
   public ParsingException(String message) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
----------------------------------------------------------------------
diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
index 5b2dbef..865d89f 100644
--- a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
+++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala
@@ -178,54 +178,54 @@ class JavaDriverConfig {
 
     protocolVersion match {
       case "1" =>
-        DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8"
-        DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2"
-        DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2"
-        DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
-        DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100"
-        DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "1"
-        DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "128"
-        DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "128"
+        defaultMaxConnectionPerHostLocal = "8"
+        defaultMaxConnectionPerHostRemote = "2"
+        defaultCoreConnectionPerHostLocal = "2"
+        defaultCoreConnectionPerHostRemote = "1"
+        defaultNewConnectionThresholdLocal = "100"
+        defaultNewConnectionThresholdRemote = "1"
+        defaultMaxRequestPerConnectionLocal = "128"
+        defaultMaxRequestPerConnectionRemote = "128"
         return ProtocolVersion.V1
       case "2" =>
-        DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8"
-        DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2"
-        DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2"
-        DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
-        DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100"
-        DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "1"
-        DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "128"
-        DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "128"
+        defaultMaxConnectionPerHostLocal = "8"
+        defaultMaxConnectionPerHostRemote = "2"
+        defaultCoreConnectionPerHostLocal = "2"
+        defaultCoreConnectionPerHostRemote = "1"
+        defaultNewConnectionThresholdLocal = "100"
+        defaultNewConnectionThresholdRemote = "1"
+        defaultMaxRequestPerConnectionLocal = "128"
+        defaultMaxRequestPerConnectionRemote = "128"
         return ProtocolVersion.V2
       case "3" =>
-        DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
-        DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
-        DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
-        DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
-        DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
-        DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
-        DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
-        DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
+        defaultMaxConnectionPerHostLocal = "1"
+        defaultMaxConnectionPerHostRemote = "1"
+        defaultCoreConnectionPerHostLocal = "1"
+        defaultCoreConnectionPerHostRemote = "1"
+        defaultNewConnectionThresholdLocal = "800"
+        defaultNewConnectionThresholdRemote = "200"
+        defaultMaxRequestPerConnectionLocal = "1024"
+        defaultMaxRequestPerConnectionRemote = "256"
         return ProtocolVersion.V3
       case "4" =>
-        DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
-        DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
-        DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
-        DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
-        DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
-        DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
-        DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
-        DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
+        defaultMaxConnectionPerHostLocal = "1"
+        defaultMaxConnectionPerHostRemote = "1"
+        defaultCoreConnectionPerHostLocal = "1"
+        defaultCoreConnectionPerHostRemote = "1"
+        defaultNewConnectionThresholdLocal = "800"
+        defaultNewConnectionThresholdRemote = "200"
+        defaultMaxRequestPerConnectionLocal = "1024"
+        defaultMaxRequestPerConnectionRemote = "256"
         return ProtocolVersion.V4
       case _ =>
-        DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1"
-        DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1"
-        DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1"
-        DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1"
-        DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800"
-        DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200"
-        DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024"
-        DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256"
+        defaultMaxConnectionPerHostLocal = "1"
+        defaultMaxConnectionPerHostRemote = "1"
+        defaultCoreConnectionPerHostLocal = "1"
+        defaultCoreConnectionPerHostRemote = "1"
+        defaultNewConnectionThresholdLocal = "800"
+        defaultNewConnectionThresholdRemote = "200"
+        defaultMaxRequestPerConnectionLocal = "1024"
+        defaultMaxRequestPerConnectionRemote = "256"
         return ProtocolVersion.NEWEST_SUPPORTED
     }
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8bb888b4/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
index cf392bb..df5e4ac 100644
--- a/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
+++ b/cassandra/src/test/java/org/apache/zeppelin/cassandra/CassandraInterpreterTest.java
@@ -16,18 +16,57 @@
  */
 package org.apache.zeppelin.cassandra;
 
-import static com.datastax.driver.core.ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
 import static com.google.common.collect.FluentIterable.from;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+import static com.datastax.driver.core.ProtocolOptions.DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
+
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CLUSTER_NAME;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_COMPRESSION_PROTOCOL;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CREDENTIALS_PASSWORD;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_CREDENTIALS_USERNAME;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_HOSTS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_LOAD_BALANCING_POLICY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_PORT;
 import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_PROTOCOL_VERSION;
-import static org.apache.zeppelin.cassandra.CassandraInterpreter.*;
-import static org.assertj.core.api.Assertions.*;
-import static org.mockito.Mockito.when;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_CONSISTENCY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_FETCH_SIZE;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_RECONNECTION_POLICY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_RETRY_POLICY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SOCKET_TCP_NO_DELAY;
+import static org.apache.zeppelin.cassandra.CassandraInterpreter.CASSANDRA_SPECULATIVE_EXECUTION_POLICY;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Properties;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ProtocolVersion;
@@ -40,714 +79,716 @@ import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.junit.*;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Properties;
 
 @RunWith(MockitoJUnitRunner.class)
 public class CassandraInterpreterTest {
-
-    private static final String ARTISTS_TABLE = "zeppelin.artists";
-
-    public static Session session = CassandraEmbeddedServerBuilder
-        .noEntityPackages()
-        .withKeyspaceName("zeppelin")
-        .withScript("prepare_schema.cql")
-        .withScript("prepare_data.cql")
-        .withProtocolVersion(ProtocolVersion.V3)
-        .buildNativeSessionOnly();
-
-    private static CassandraInterpreter interpreter;
-
-    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-    private InterpreterContext intrContext;
-
-    @BeforeClass
-    public static void setUp() {
-        Properties properties = new Properties();
-        final Cluster cluster = session.getCluster();
-
-        properties.setProperty(CASSANDRA_CLUSTER_NAME, cluster.getClusterName());
-        properties.setProperty(CASSANDRA_COMPRESSION_PROTOCOL, "NONE");
-        properties.setProperty(CASSANDRA_CREDENTIALS_USERNAME, "none");
-        properties.setProperty(CASSANDRA_CREDENTIALS_PASSWORD, "none");
-
-        properties.setProperty(CASSANDRA_PROTOCOL_VERSION, "3");
-        properties.setProperty(CASSANDRA_LOAD_BALANCING_POLICY, "DEFAULT");
-        properties.setProperty(CASSANDRA_RETRY_POLICY, "DEFAULT");
-        properties.setProperty(CASSANDRA_RECONNECTION_POLICY, "DEFAULT");
-        properties.setProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY, "DEFAULT");
-
-        properties.setProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
-                DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS + "");
-
-        properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL, "100");
-        properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE, "100");
-        properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL, "2");
-        properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE, "1");
-        properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL, "8");
-        properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE, "2");
-        properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL, "1024");
-        properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE, "256");
-
-        properties.setProperty(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS, "120");
-        properties.setProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS, "5000");
-        properties.setProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS, "30");
-
-        properties.setProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY, "ONE");
-        properties.setProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY, "SERIAL");
-        properties.setProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE, "5000");
-
-        properties.setProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS, "5000");
-        properties.setProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS, "12000");
-        properties.setProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, "true");
-
-        properties.setProperty(CASSANDRA_HOSTS, from(cluster.getMetadata().getAllHosts()).first().get().getAddress().getHostAddress());
-        properties.setProperty(CASSANDRA_PORT, cluster.getConfiguration().getProtocolOptions().getPort()+"");
-        interpreter = new CassandraInterpreter(properties);
-        interpreter.open();
-    }
-
-    @AfterClass
-    public static void tearDown() {
-        interpreter.close();
-    }
-
-    @Before
-    public void prepareContext() {
-        when(intrContext.getParagraphTitle()).thenReturn("Paragraph1");
-    }
-
-    @Test
-    public void should_create_cluster_and_session_upon_call_to_open() throws Exception {
-        assertThat(interpreter.cluster).isNotNull();
-        assertThat(interpreter.cluster.getClusterName()).isEqualTo(session.getCluster().getClusterName());
-        assertThat(interpreter.session).isNotNull();
-        assertThat(interpreter.helper).isNotNull();
-    }
-
-    @Test
-    public void should_interpret_simple_select() throws Exception {
-        //Given
-
-        //When
-        final InterpreterResult actual = interpreter.interpret("SELECT * FROM " + ARTISTS_TABLE + " LIMIT 10;", intrContext);
-
-        //Then
-        assertThat(actual).isNotNull();
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(actual.message().get(0).getData()).isEqualTo("name\tborn\tcountry\tdied\tgender\tstyles\ttype\n" +
-                "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
-                "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n" +
-                "Sheryl Crow\t1962-02-11\tUSA\tnull\tFemale\t[Classic, Rock, Country, Blues, Pop, Folk]\tPerson\n" +
-                "Doof\t1968-08-31\tUnited Kingdom\tnull\tnull\t[Unknown]\tPerson\n" +
-                "House of Large Sizes\t1986-01-01\tUSA\t2003\tnull\t[Unknown]\tGroup\n" +
-                "Fanfarlo\t2006-01-01\tUnited Kingdom\tnull\tnull\t[Rock, Indie, Pop, Classic]\tGroup\n" +
-                "Jeff Beck\t1944-06-24\tUnited Kingdom\tnull\tMale\t[Rock, Pop, Classic]\tPerson\n" +
-                "Los Paranoias\tnull\tUnknown\tnull\tnull\t[Unknown]\tnull\n" +
-                "…And You Will Know Us by the Trail of Dead\t1994-01-01\tUSA\tnull\tnull\t[Rock, Pop, Classic]\tGroup\n");
-
-    }
-
-    @Test
-    public void should_interpret_select_statement() throws Exception {
-        //Given
-
-        //When
-        final InterpreterResult actual = interpreter.interpret("SELECT * FROM " + ARTISTS_TABLE + " LIMIT 2;", intrContext);
-
-        //Then
-        assertThat(actual).isNotNull();
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(actual.message().get(0).getData()).isEqualTo("name\tborn\tcountry\tdied\tgender\tstyles\ttype\n" +
-                "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
-                "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n");
-
-    }
-
-    @Test
-    public void should_interpret_multiple_statements_with_single_line_logged_batch() throws Exception {
-        //Given
-        String statements = "CREATE TABLE IF NOT EXISTS zeppelin.albums(\n" +
-                "    title text PRIMARY KEY,\n" +
-                "    artist text,\n" +
-                "    year int\n" +
-                ");\n" +
-                "BEGIN BATCH"+
-                "   INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Impossible Dream EP','Carter the Unstoppable Sex Machine',1992);"+
-                "   INSERT INTO zeppelin.albums(title,artist,year) VALUES('The Way You Are','Tears for Fears',1983);"+
-                "   INSERT INTO zeppelin.albums(title,artist,year) VALUES('Primitive','Soulfly',2003);"+
-                "APPLY BATCH;\n"+
-                "SELECT * FROM zeppelin.albums;";
-        //When
-        final InterpreterResult actual = interpreter.interpret(statements, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(actual.message().get(0).getData()).isEqualTo("title\tartist\tyear\n" +
-                "The Impossible Dream EP\tCarter the Unstoppable Sex Machine\t1992\n" +
-                "The Way You Are\tTears for Fears\t1983\n" +
-                "Primitive\tSoulfly\t2003\n");
-    }
+  private static final String ARTISTS_TABLE = "zeppelin.artists";
+
+  public static Session session = CassandraEmbeddedServerBuilder
+          .noEntityPackages()
+          .withKeyspaceName("zeppelin")
+          .withScript("prepare_schema.cql")
+          .withScript("prepare_data.cql")
+          .withProtocolVersion(ProtocolVersion.V3)
+          .buildNativeSessionOnly();
+
+  private static CassandraInterpreter interpreter;
+
+  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+  private InterpreterContext intrContext;
+
+  @BeforeClass
+  public static void setUp() {
+    Properties properties = new Properties();
+    final Cluster cluster = session.getCluster();
+
+    properties.setProperty(CASSANDRA_CLUSTER_NAME, cluster.getClusterName());
+    properties.setProperty(CASSANDRA_COMPRESSION_PROTOCOL, "NONE");
+    properties.setProperty(CASSANDRA_CREDENTIALS_USERNAME, "none");
+    properties.setProperty(CASSANDRA_CREDENTIALS_PASSWORD, "none");
+
+    properties.setProperty(CASSANDRA_PROTOCOL_VERSION, "3");
+    properties.setProperty(CASSANDRA_LOAD_BALANCING_POLICY, "DEFAULT");
+    properties.setProperty(CASSANDRA_RETRY_POLICY, "DEFAULT");
+    properties.setProperty(CASSANDRA_RECONNECTION_POLICY, "DEFAULT");
+    properties.setProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY, "DEFAULT");
+
+    properties.setProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
+            DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS + "");
+
+    properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL, "100");
+    properties.setProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE, "100");
+    properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL, "2");
+    properties.setProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE, "1");
+    properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL, "8");
+    properties.setProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE, "2");
+    properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL, "1024");
+    properties.setProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE, "256");
+
+    properties.setProperty(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS, "120");
+    properties.setProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS, "5000");
+    properties.setProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS, "30");
+
+    properties.setProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY, "ONE");
+    properties.setProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY, "SERIAL");
+    properties.setProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE, "5000");
+
+    properties.setProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS, "5000");
+    properties.setProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS, "12000");
+    properties.setProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, "true");
+
+    properties.setProperty(CASSANDRA_HOSTS, from(cluster.getMetadata().getAllHosts()).first()
+            .get().getAddress().getHostAddress());
+    properties.setProperty(CASSANDRA_PORT, cluster.getConfiguration().getProtocolOptions()
+            .getPort() + "");
+    interpreter = new CassandraInterpreter(properties);
+    interpreter.open();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    interpreter.close();
+  }
+
+  @Before
+  public void prepareContext() {
+    when(intrContext.getParagraphTitle()).thenReturn("Paragraph1");
+  }
+
+  @Test
+  public void should_create_cluster_and_session_upon_call_to_open() throws Exception {
+    assertThat(interpreter.cluster).isNotNull();
+    assertThat(interpreter.cluster.getClusterName()).isEqualTo(session.getCluster()
+            .getClusterName());
+    assertThat(interpreter.session).isNotNull();
+    assertThat(interpreter.helper).isNotNull();
+  }
+
+  @Test
+  public void should_interpret_simple_select() throws Exception {
+    //Given
+
+    //When
+    final InterpreterResult actual = interpreter.interpret("SELECT * FROM " + ARTISTS_TABLE +
+            " LIMIT 10;", intrContext);
+
+    //Then
+    assertThat(actual).isNotNull();
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(actual.message().get(0).getData()).isEqualTo("name\tborn\tcountry\tdied\tgender\t" +
+            "styles\ttype\n" +
+            "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
+            "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n" +
+            "Sheryl Crow\t1962-02-11\tUSA\tnull\tFemale\t" +
+            "[Classic, Rock, Country, Blues, Pop, Folk]\tPerson\n" +
+            "Doof\t1968-08-31\tUnited Kingdom\tnull\tnull\t[Unknown]\tPerson\n" +
+            "House of Large Sizes\t1986-01-01\tUSA\t2003\tnull\t[Unknown]\tGroup\n" +
+            "Fanfarlo\t2006-01-01\tUnited Kingdom\tnull\tnull\t" +
+            "[Rock, Indie, Pop, Classic]\tGroup\n" +
+            "Jeff Beck\t1944-06-24\tUnited Kingdom\tnull\tMale\t[Rock, Pop, Classic]\tPerson\n" +
+            "Los Paranoias\tnull\tUnknown\tnull\tnull\t[Unknown]\tnull\n" +
+            "…And You Will Know Us by the Trail of Dead\t1994-01-01\tUSA\tnull\tnull\t" +
+            "[Rock, Pop, Classic]\tGroup\n");
+  }
+
+  @Test
+  public void should_interpret_select_statement() throws Exception {
+    //Given
+
+    //When
+    final InterpreterResult actual = interpreter.interpret("SELECT * FROM " + ARTISTS_TABLE +
+            " LIMIT 2;", intrContext);
+
+    //Then
+    assertThat(actual).isNotNull();
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(actual.message().get(0).getData())
+            .isEqualTo("name\tborn\tcountry\tdied\tgender\tstyles\ttype\n" +
+            "Bogdan Raczynski\t1977-01-01\tPoland\tnull\tMale\t[Dance, Electro]\tPerson\n" +
+            "Krishna Das\t1947-05-31\tUSA\tnull\tMale\t[Unknown]\tPerson\n");
+  }
+
+  @Test
+  public void should_interpret_multiple_statements_with_single_line_logged_batch() {
+    //Given
+    String statements = "CREATE TABLE IF NOT EXISTS zeppelin.albums(\n" +
+            "    title text PRIMARY KEY,\n" +
+            "    artist text,\n" +
+            "    year int\n" +
+            ");\n" +
+            "BEGIN BATCH" +
+            "   INSERT INTO zeppelin.albums(title,artist,year) " +
+            "VALUES('The Impossible Dream EP','Carter the Unstoppable Sex Machine',1992);" +
+            "   INSERT INTO zeppelin.albums(title,artist,year) " +
+            "VALUES('The Way You Are','Tears for Fears',1983);" +
+            "   INSERT INTO zeppelin.albums(title,artist,year) " +
+            "VALUES('Primitive','Soulfly',2003);" +
+            "APPLY BATCH;\n" +
+            "SELECT * FROM zeppelin.albums;";
+    //When
+    final InterpreterResult actual = interpreter.interpret(statements, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(actual.message().get(0).getData()).isEqualTo("title\tartist\tyear\n" +
+            "The Impossible Dream EP\tCarter the Unstoppable Sex Machine\t1992\n" +
+            "The Way You Are\tTears for Fears\t1983\n" +
+            "Primitive\tSoulfly\t2003\n");
+  }
     
-    @Test
-    public void should_throw_statement_not_having_semi_colon() throws Exception {
-        //Given
-        String statement = "SELECT * zeppelin.albums";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.ERROR);
-        assertThat(actual.message().get(0).getData())
-                .contains("Error parsing input:\n" +
-                        "\t'SELECT * zeppelin.albums'\n" +
-                        "Did you forget to add ; (semi-colon) at the end of each CQL statement ?");
-    }
-
-    @Test
-    public void should_validate_statement() throws Exception {
-        //Given
-        String statement = "SELECT * zeppelin.albums;";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.ERROR);
-        assertThat(actual.message().get(0).getData()).contains("line 1:9 missing K_FROM at 'zeppelin' (SELECT * [zeppelin]....)");
-    }
-
-    @Test
-    public void should_execute_statement_with_consistency_option() throws Exception {
-        //Given
-        String statement = "@consistency=THREE\n" +
-                "SELECT * FROM zeppelin.artists LIMIT 1;";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.ERROR);
-        assertThat(actual.message().get(0).getData())
-                .contains("Not enough replicas available for query at consistency THREE (3 required but only 1 alive)");
-    }
-
-    @Test
-    public void should_execute_statement_with_serial_consistency_option() throws Exception {
-        //Given
-        String statement = "@serialConsistency=SERIAL\n" +
-                "SELECT * FROM zeppelin.artists LIMIT 1;";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-    }
-
-    @Test
-    public void should_execute_statement_with_timestamp_option() throws Exception {
-        //Given
-        String statement1 = "INSERT INTO zeppelin.ts(key,val) VALUES('k','v1');";
-        String statement2 = "@timestamp=15\n" +
-                "INSERT INTO zeppelin.ts(key,val) VALUES('k','v2');";
-
-        // Insert v1 with current timestamp
-        interpreter.interpret(statement1, intrContext);
-
-        Thread.sleep(1);
-
-        //When
-        // Insert v2 with past timestamp
-        interpreter.interpret(statement2, intrContext);
-        final String actual = session.execute("SELECT * FROM zeppelin.ts LIMIT 1").one().getString("val");
-
-        //Then
-        assertThat(actual).isEqualTo("v1");
-    }
-
-    @Test
-    public void should_execute_statement_with_retry_policy() throws Exception {
-        //Given
-        String statement = "@retryPolicy=" + interpreter.LOGGING_DOWNGRADING_RETRY + "\n" +
-                "@consistency=THREE\n" +
-                "SELECT * FROM zeppelin.artists LIMIT 1;";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-    }
-
-    @Test
-    public void should_execute_statement_with_request_timeout() throws Exception {
-        //Given
-        String statement = "@requestTimeOut=10000000\n" +
-                "SELECT * FROM zeppelin.artists;";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(statement, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-    }
-
-    @Test
-    public void should_execute_prepared_and_bound_statements() throws Exception {
-        //Given
-        String queries = "@prepare[ps]=INSERT INTO zeppelin.prepared(key,val) VALUES(?,?)\n" +
-                "@prepare[select]=SELECT * FROM zeppelin.prepared WHERE key=:key\n" +
-                "@bind[ps]='myKey','myValue'\n" +
-                "@bind[select]='myKey'";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(actual.message().get(0).getData()).isEqualTo("key\tval\n" +
-                "myKey\tmyValue\n");
-    }
-
-    @Test
-    public void should_execute_bound_statement() throws Exception {
-        //Given
-        String queries = "@prepare[users_insert]=INSERT INTO zeppelin.users" +
-                "(login,firstname,lastname,addresses,location)" +
-                "VALUES(:login,:fn,:ln,:addresses,:loc)\n" +
-                "@bind[users_insert]='jdoe','John','DOE'," +
-                "{street_number: 3, street_name: 'Beverly Hills Bld', zip_code: 90209," +
-                " country: 'USA', extra_info: ['Right on the hills','Next to the post box']," +
-                " phone_numbers: {'home': 2016778524, 'office': 2015790847}}," +
-                "('USA', 90209, 'Beverly Hills')\n" +
-                "SELECT * FROM zeppelin.users WHERE login='jdoe';";
-        //When
-        final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(actual.message().get(0).getData()).isEqualTo(
-                "login\taddresses\tage\tdeceased\tfirstname\tlast_update\tlastname\tlocation\n" +
-                        "jdoe\t" +
-                        "{street_number:3,street_name:'Beverly Hills Bld',zip_code:90209," +
-                        "country:'USA',extra_info:['Right on the hills','Next to the post box']," +
-                        "phone_numbers:{'office':2015790847,'home':2016778524}}\tnull\t" +
-                        "null\t" +
-                        "John\t" +
-                        "null\t" +
-                        "DOE\t" +
-                        "('USA',90209,'Beverly Hills')\n");
-    }
-
-    @Test
-    public void should_exception_when_executing_unknown_bound_statement() throws Exception {
-        //Given
-        String queries = "@bind[select_users]='jdoe'";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.ERROR);
-        assertThat(actual.message().get(0).getData())
-                .isEqualTo("The statement 'select_users' can not be bound to values. " +
-                        "Are you sure you did prepare it with @prepare[select_users] ?");
-    }
-
-    @Test
-    public void should_extract_variable_from_statement() throws Exception {
-        //Given
-        AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
-        when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
-        when(intrContext.getGui().input("login", "hsue")).thenReturn("hsue");
-        when(intrContext.getGui().input("age", "27")).thenReturn("27");
-
-        String queries = "@prepare[test_insert_with_variable]=" +
-                "INSERT INTO zeppelin.users(login,firstname,lastname,age) VALUES(?,?,?,?)\n" +
-                "@bind[test_insert_with_variable]='{{login=hsue}}','Helen','SUE',{{age=27}}\n" +
-                "SELECT firstname,lastname,age FROM zeppelin.users WHERE login='hsue';";
-        //When
-        final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\tage\n" +
-                "Helen\tSUE\t27\n");
-
-    }
-
-    @Test
-    public void should_just_prepare_statement() throws Exception {
-        //Given
-        String queries = "@prepare[just_prepare]=SELECT name,country,styles FROM zeppelin.artists LIMIT 3";
-        final String expected = reformatHtml(
-                readTestResource("/scalate/NoResult.html"));
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
-    }
-
-    @Test
-    public void should_execute_bound_statement_with_no_bound_value() throws Exception {
-        //Given
-        String queries = "@prepare[select_no_bound_value]=SELECT name,country,styles FROM zeppelin.artists LIMIT 3\n" +
-                "@bind[select_no_bound_value]";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(actual.message().get(0).getData()).isEqualTo("name\tcountry\tstyles\n" +
-                "Bogdan Raczynski\tPoland\t[Dance, Electro]\n" +
-                "Krishna Das\tUSA\t[Unknown]\n" +
-                "Sheryl Crow\tUSA\t[Classic, Rock, Country, Blues, Pop, Folk]\n");
-
-    }
-
-    @Test
-    public void should_parse_date_value() throws Exception {
-        //Given
-        String queries = "@prepare[parse_date]=INSERT INTO zeppelin.users(login,last_update) VALUES(?,?)\n" +
-                "@bind[parse_date]='last_update','2015-07-30 12:00:01'\n" +
-                "SELECT last_update FROM zeppelin.users WHERE login='last_update';";
-        //When
-        final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(actual.message().get(0).getData()).contains("last_update\n" +
-                "Thu Jul 30 12:00:01");
-    }
-
-    @Test
-    public void should_bind_null_value() throws Exception {
-        //Given
-        String queries = "@prepare[bind_null]=INSERT INTO zeppelin.users(login,firstname,lastname) VALUES(?,?,?)\n" +
-                "@bind[bind_null]='bind_null',null,'NULL'\n" +
-                "SELECT firstname,lastname FROM zeppelin.users WHERE login='bind_null';";
-        //When
-        final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\n" +
-                "null\tNULL\n");
-    }
-
-    @Test
-    public void should_bind_boolean_value() throws Exception {
-        //Given
-        String queries = "@prepare[bind_boolean]=INSERT INTO zeppelin.users(login,deceased) VALUES(?,?)\n" +
-                "@bind[bind_boolean]='bind_bool',false\n" +
-                "SELECT login,deceased FROM zeppelin.users WHERE login='bind_bool';";
-        //When
-        final InterpreterResult actual = interpreter.interpret(queries, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(actual.message().get(0).getData()).isEqualTo("login\tdeceased\n" +
-                "bind_bool\tfalse\n");
-    }
-
-    @Test
-    public void should_fail_when_executing_a_removed_prepared_statement() throws Exception {
-        //Given
-        String prepare_first = "@prepare[to_be_removed]=INSERT INTO zeppelin.users(login,deceased) VALUES(?,?)";
-        interpreter.interpret(prepare_first, intrContext);
-        String remove_prepared = "@remove_prepare[to_be_removed]\n" +
-                "@bind[to_be_removed]='bind_bool'";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(remove_prepared, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.ERROR);
-        assertThat(actual.message().get(0).getData()).isEqualTo("The statement 'to_be_removed' can not be bound to values. " +
-                "Are you sure you did prepare it with @prepare[to_be_removed] ?");
-    }
-
-    @Test
-    public void should_display_statistics_for_non_select_statement() throws Exception {
-        //Given
-        String query = "USE zeppelin;\nCREATE TABLE IF NOT EXISTS no_select(id int PRIMARY KEY);";
-        final String rawResult = reformatHtml(readTestResource("/scalate/NoResultWithExecutionInfo.html"));
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-        final Cluster cluster = session.getCluster();
-        final int port = cluster.getConfiguration().getProtocolOptions().getPort();
-        final String address = cluster.getMetadata().getAllHosts().iterator().next()
-                .getAddress().getHostAddress()
-                .replaceAll("/", "").replaceAll("\\[", "").replaceAll("\\]","");
-        //Then
-        final String expected = rawResult.replaceAll("TRIED_HOSTS", address+":"+port)
-                .replaceAll("QUERIED_HOSTS", address +":"+port);
-
-
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
-    }
-
-    @Test
-    public void should_error_and_display_stack_trace() throws Exception {
-        //Given
-        String query = "@consistency=THREE\n" +
-                "SELECT * FROM zeppelin.users LIMIT 3;";
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.ERROR);
-        assertThat(actual.message().get(0).getData()).contains("All host(s) tried for query failed");
-    }
-
-    @Test
-    public void should_describe_cluster() throws Exception {
-        //Given
-
-        String query = "DESCRIBE CLUSTER;";
-        final String expected = reformatHtml(
-                readTestResource("/scalate/DescribeCluster.html"));
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
-        assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
-    }
-
-    @Test
-    public void should_describe_keyspaces() throws Exception {
-        //Given
-        String query = "DESCRIBE KEYSPACES;";
-        final String expected = reformatHtml(
-                readTestResource("/scalate/DescribeKeyspaces.html"));
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
-        assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
-    }
-
-    @Test
-         public void should_describe_keyspace() throws Exception {
-        //Given
-        String query = "DESCRIBE KEYSPACE live_data;";
-        final String expected = reformatHtml(
-                readTestResource("/scalate/DescribeKeyspace_live_data.html"));
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
-        assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
-    }
-
-    @Test
-    @Ignore
-    //TODO activate test when using Java 8 and C* 3.x
-    public void should_describe_function() throws Exception {
-        //Given
-        Properties properties = new Properties();
-        properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
-        properties.setProperty(CASSANDRA_PORT,  "9042");
-        Interpreter interpreter = new CassandraInterpreter(properties);
-        interpreter.open();
-
-        String createFunction = "CREATE FUNCTION zeppelin.maxof(val1 int,val2 int) " +
-                "RETURNS NULL ON NULL INPUT " +
-                "RETURNS int " +
-                "LANGUAGE java " +
-                "AS $$" +
-                "    return Math.max(val1, val2);\n" +
-                "$$;";
-        interpreter.interpret(createFunction, intrContext);
-        String query = "DESCRIBE FUNCTION zeppelin.maxOf;";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(actual.message()).isEqualTo("xxxxx");
-    }
-
-    @Test
-    @Ignore
-    //TODO activate test when using Java 8 and C* 3.x
-    public void should_describe_aggregate() throws Exception {
-        //Given
-        Properties properties = new Properties();
-        properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
-        properties.setProperty(CASSANDRA_PORT,  "9042");
-        Interpreter interpreter = new CassandraInterpreter(properties);
-        interpreter.open();
-
-        final String query = "DESCRIBE AGGREGATES;";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
-    }
-
-    @Test
-    @Ignore
-    //TODO activate test when using Java 8 and C* 3.x
-    public void should_describe_materialized_view() throws Exception {
-        //Given
-        Properties properties = new Properties();
-        properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
-        properties.setProperty(CASSANDRA_PORT,  "9042");
-        Interpreter interpreter = new CassandraInterpreter(properties);
-        interpreter.open();
-
-        final String query = "DESCRIBE MATERIALIZED VIEWS;";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-    }
-
-    @Test
-    public void should_describe_table() throws Exception {
-        //Given
-        String query = "DESCRIBE TABLE live_data.complex_table;";
-        final String expected = reformatHtml(
-                readTestResource("/scalate/DescribeTable_live_data_complex_table.html"));
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
-        assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
-    }
-
-    @Test
-    public void should_describe_udt() throws Exception {
-        //Given
-        String query = "DESCRIBE TYPE live_data.address;";
-        final String expected = reformatHtml(
-                readTestResource("/scalate/DescribeType_live_data_address.html"));
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
-        assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
-    }
-
-    @Test
-    public void should_describe_udt_withing_logged_in_keyspace() throws Exception {
-        //Given
-        String query = "USE live_data;\n" +
-                "DESCRIBE TYPE address;";
-        final String expected = reformatHtml(
-                readTestResource("/scalate/DescribeType_live_data_address_within_current_keyspace.html"));
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-
-        assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
-    }
-
-    @Test
-    public void should_error_describing_non_existing_table() throws Exception {
-        //Given
-        String query = "USE system;\n" +
-                "DESCRIBE TABLE complex_table;";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.ERROR);
-        assertThat(actual.message().get(0).getData()).contains("Cannot find table system.complex_table");
-    }
-
-    @Test
-    public void should_error_describing_non_existing_udt() throws Exception {
-        //Given
-        String query = "USE system;\n" +
-                "DESCRIBE TYPE address;";
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.ERROR);
-        assertThat(actual.message().get(0).getData()).contains("Cannot find type system.address");
-    }
-
-    @Test
-    public void should_show_help() throws Exception {
-        //Given
-        String query = "HELP;";
-        final String expected = reformatHtml(readTestResource("/scalate/Help.html"));
-
-        //When
-        final InterpreterResult actual = interpreter.interpret(query, intrContext);
-
-        //Then
-        assertThat(actual.code()).isEqualTo(Code.SUCCESS);
-        assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
-    }
-
-    private static String reformatHtml(String rawHtml) {
-        return  rawHtml
-                .replaceAll("\\s*\n\\s*","")
-                .replaceAll(">\\s+<", "><")
-                .replaceAll("(?s)data-target=\"#[a-f0-9-]+(?:_asCQL|_indices_asCQL)?\"", "")
-                .replaceAll("(?s)id=\"[a-f0-9-]+(?:_asCQL|_indices_asCQL)?\"", "")
-                .trim();
-    }
-
-    private static String readTestResource(String testResource) {
-        StringBuilder builder = new StringBuilder();
-        InputStream stream = testResource.getClass().getResourceAsStream(testResource);
-
-        try (BufferedReader br = new BufferedReader(new InputStreamReader(stream))) {
-            String line;
-            while ((line = br.readLine()) != null) {
-                builder.append(line).append("\n");
-            }
-        } catch (Exception ex) {
-            throw  new RuntimeException(ex);
-        }
-
-        return builder.toString();
-    }
-}
\ No newline at end of file
+  @Test
+  public void should_throw_statement_not_having_semi_colon() throws Exception {
+    //Given
+    String statement = "SELECT * zeppelin.albums";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.ERROR);
+    assertThat(actual.message().get(0).getData())
+            .contains("Error parsing input:\n" +
+                    "\t'SELECT * zeppelin.albums'\n" +
+                    "Did you forget to add ; (semi-colon) at the end of each CQL statement ?");
+  }
+
+  @Test
+  public void should_validate_statement() throws Exception {
+    //Given
+    String statement = "SELECT * zeppelin.albums;";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.ERROR);
+    assertThat(actual.message().get(0).getData())
+            .contains("line 1:9 missing K_FROM at 'zeppelin' (SELECT * [zeppelin]....)");
+  }
+
+  @Test
+  public void should_execute_statement_with_consistency_option() throws Exception {
+    //Given
+    String statement = "@consistency=THREE\n" +
+            "SELECT * FROM zeppelin.artists LIMIT 1;";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.ERROR);
+    assertThat(actual.message().get(0).getData())
+            .contains("Not enough replicas available for query at consistency THREE (3 required " +
+                    "but only 1 alive)");
+  }
+
+  @Test
+  public void should_execute_statement_with_serial_consistency_option() throws Exception {
+    //Given
+    String statement = "@serialConsistency=SERIAL\n" +
+            "SELECT * FROM zeppelin.artists LIMIT 1;";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+  }
+
+  @Test
+  public void should_execute_statement_with_timestamp_option() throws Exception {
+    //Given
+    String statement1 = "INSERT INTO zeppelin.ts(key,val) VALUES('k','v1');";
+    String statement2 = "@timestamp=15\n" +
+            "INSERT INTO zeppelin.ts(key,val) VALUES('k','v2');";
+
+    // Insert v1 with current timestamp
+    interpreter.interpret(statement1, intrContext);
+
+    Thread.sleep(1);
+
+    //When
+    // Insert v2 with past timestamp
+    interpreter.interpret(statement2, intrContext);
+    final String actual = session.execute("SELECT * FROM zeppelin.ts LIMIT 1").one()
+            .getString("val");
+
+    //Then
+    assertThat(actual).isEqualTo("v1");
+  }
+
+  @Test
+  public void should_execute_statement_with_retry_policy() throws Exception {
+    //Given
+    String statement = "@retryPolicy=" + interpreter.LOGGING_DOWNGRADING_RETRY + "\n" +
+            "@consistency=THREE\n" +
+            "SELECT * FROM zeppelin.artists LIMIT 1;";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+  }
+
+  @Test
+  public void should_execute_statement_with_request_timeout() throws Exception {
+    //Given
+    String statement = "@requestTimeOut=10000000\n" +
+            "SELECT * FROM zeppelin.artists;";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(statement, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+  }
+
+  @Test
+  public void should_execute_prepared_and_bound_statements() throws Exception {
+    //Given
+    String queries = "@prepare[ps]=INSERT INTO zeppelin.prepared(key,val) VALUES(?,?)\n" +
+            "@prepare[select]=SELECT * FROM zeppelin.prepared WHERE key=:key\n" +
+            "@bind[ps]='myKey','myValue'\n" +
+            "@bind[select]='myKey'";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(actual.message().get(0).getData()).isEqualTo("key\tval\n" +
+            "myKey\tmyValue\n");
+  }
+
+  @Test
+  public void should_execute_bound_statement() throws Exception {
+    //Given
+    String queries = "@prepare[users_insert]=INSERT INTO zeppelin.users" +
+            "(login,firstname,lastname,addresses,location)" +
+            "VALUES(:login,:fn,:ln,:addresses,:loc)\n" +
+            "@bind[users_insert]='jdoe','John','DOE'," +
+            "{street_number: 3, street_name: 'Beverly Hills Bld', zip_code: 90209," +
+            " country: 'USA', extra_info: ['Right on the hills','Next to the post box']," +
+            " phone_numbers: {'home': 2016778524, 'office': 2015790847}}," +
+            "('USA', 90209, 'Beverly Hills')\n" +
+            "SELECT * FROM zeppelin.users WHERE login='jdoe';";
+    //When
+    final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(actual.message().get(0).getData()).isEqualTo(
+            "login\taddresses\tage\tdeceased\tfirstname\tlast_update\tlastname\tlocation\n" +
+                    "jdoe\t" +
+                    "{street_number:3,street_name:'Beverly Hills Bld',zip_code:90209," +
+                    "country:'USA',extra_info:['Right on the hills','Next to the post box']," +
+                    "phone_numbers:{'office':2015790847,'home':2016778524}}\tnull\t" +
+                    "null\t" +
+                    "John\t" +
+                    "null\t" +
+                    "DOE\t" +
+                    "('USA',90209,'Beverly Hills')\n");
+  }
+
+  @Test
+  public void should_exception_when_executing_unknown_bound_statement() throws Exception {
+    //Given
+    String queries = "@bind[select_users]='jdoe'";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.ERROR);
+    assertThat(actual.message().get(0).getData())
+            .isEqualTo("The statement 'select_users' can not be bound to values. " +
+                    "Are you sure you did prepare it with @prepare[select_users] ?");
+  }
+
+  @Test
+  public void should_extract_variable_from_statement() throws Exception {
+    //Given
+    AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
+    when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
+    when(intrContext.getGui().input("login", "hsue")).thenReturn("hsue");
+    when(intrContext.getGui().input("age", "27")).thenReturn("27");
+
+    String queries = "@prepare[test_insert_with_variable]=" +
+            "INSERT INTO zeppelin.users(login,firstname,lastname,age) VALUES(?,?,?,?)\n" +
+            "@bind[test_insert_with_variable]='{{login=hsue}}','Helen','SUE',{{age=27}}\n" +
+            "SELECT firstname,lastname,age FROM zeppelin.users WHERE login='hsue';";
+    //When
+    final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\tage\n" +
+            "Helen\tSUE\t27\n");
+  }
+
+  @Test
+  public void should_just_prepare_statement() throws Exception {
+    //Given
+    String queries = "@prepare[just_prepare]=SELECT name,country,styles " +
+            "FROM zeppelin.artists LIMIT 3";
+    final String expected = reformatHtml(
+            readTestResource("/scalate/NoResult.html"));
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+  }
+
+  @Test
+  public void should_execute_bound_statement_with_no_bound_value() throws Exception {
+    //Given
+    String queries = "@prepare[select_no_bound_value]=SELECT name,country,styles " +
+            "FROM zeppelin.artists LIMIT 3\n" +
+            "@bind[select_no_bound_value]";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(actual.message().get(0).getData()).isEqualTo("name\tcountry\tstyles\n" +
+            "Bogdan Raczynski\tPoland\t[Dance, Electro]\n" +
+            "Krishna Das\tUSA\t[Unknown]\n" +
+            "Sheryl Crow\tUSA\t[Classic, Rock, Country, Blues, Pop, Folk]\n");
+  }
+
+  @Test
+  public void should_parse_date_value() throws Exception {
+    //Given
+    String queries = "@prepare[parse_date]=INSERT INTO zeppelin.users(login,last_update) " +
+            "VALUES(?,?)\n" +
+            "@bind[parse_date]='last_update','2015-07-30 12:00:01'\n" +
+            "SELECT last_update FROM zeppelin.users WHERE login='last_update';";
+    //When
+    final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(actual.message().get(0).getData()).contains("last_update\n" +
+            "Thu Jul 30 12:00:01");
+  }
+
+  @Test
+  public void should_bind_null_value() throws Exception {
+    //Given
+    String queries = "@prepare[bind_null]=INSERT INTO zeppelin.users(login,firstname,lastname) " +
+            "VALUES(?,?,?)\n" +
+            "@bind[bind_null]='bind_null',null,'NULL'\n" +
+            "SELECT firstname,lastname FROM zeppelin.users WHERE login='bind_null';";
+    //When
+    final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(actual.message().get(0).getData()).isEqualTo("firstname\tlastname\n" +
+            "null\tNULL\n");
+  }
+
+  @Test
+  public void should_bind_boolean_value() throws Exception {
+    //Given
+    String queries = "@prepare[bind_boolean]=INSERT INTO zeppelin.users(login,deceased) " +
+            "VALUES(?,?)\n" +
+            "@bind[bind_boolean]='bind_bool',false\n" +
+            "SELECT login,deceased FROM zeppelin.users WHERE login='bind_bool';";
+    //When
+    final InterpreterResult actual = interpreter.interpret(queries, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(actual.message().get(0).getData()).isEqualTo("login\tdeceased\n" +
+            "bind_bool\tfalse\n");
+  }
+
+  @Test
+  public void should_fail_when_executing_a_removed_prepared_statement() throws Exception {
+    //Given
+    String prepareFirst = "@prepare[to_be_removed]=INSERT INTO zeppelin.users(login,deceased) " +
+            "VALUES(?,?)";
+    interpreter.interpret(prepareFirst, intrContext);
+    String removePrepared = "@remove_prepare[to_be_removed]\n" +
+            "@bind[to_be_removed]='bind_bool'";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(removePrepared, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.ERROR);
+    assertThat(actual.message().get(0).getData()).isEqualTo("The statement 'to_be_removed' can " +
+            "not be bound to values. Are you sure you did prepare it with " +
+            "@prepare[to_be_removed] ?");
+  }
+
+  @Test
+  public void should_display_statistics_for_non_select_statement() throws Exception {
+    //Given
+    String query = "USE zeppelin;\nCREATE TABLE IF NOT EXISTS no_select(id int PRIMARY KEY);";
+    final String rawResult = reformatHtml(readTestResource(
+            "/scalate/NoResultWithExecutionInfo.html"));
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+    final Cluster cluster = session.getCluster();
+    final int port = cluster.getConfiguration().getProtocolOptions().getPort();
+    final String address = cluster.getMetadata().getAllHosts().iterator().next()
+            .getAddress().getHostAddress()
+            .replaceAll("/", "").replaceAll("\\[", "").replaceAll("\\]", "");
+    //Then
+    final String expected = rawResult.replaceAll("TRIED_HOSTS", address + ":" + port)
+            .replaceAll("QUERIED_HOSTS", address + ":" + port);
+
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+  }
+
+  @Test
+  public void should_error_and_display_stack_trace() throws Exception {
+    //Given
+    String query = "@consistency=THREE\n" +
+            "SELECT * FROM zeppelin.users LIMIT 3;";
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.ERROR);
+    assertThat(actual.message().get(0).getData()).contains("All host(s) tried for query failed");
+  }
+
+  @Test
+  public void should_describe_cluster() throws Exception {
+    //Given
+
+    String query = "DESCRIBE CLUSTER;";
+    final String expected = reformatHtml(
+            readTestResource("/scalate/DescribeCluster.html"));
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+  }
+
+  @Test
+  public void should_describe_keyspaces() throws Exception {
+    //Given
+    String query = "DESCRIBE KEYSPACES;";
+    final String expected = reformatHtml(
+            readTestResource("/scalate/DescribeKeyspaces.html"));
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+  }
+
+  @Test
+  public void should_describe_keyspace() throws Exception {
+    //Given
+    String query = "DESCRIBE KEYSPACE live_data;";
+    final String expected = reformatHtml(
+            readTestResource("/scalate/DescribeKeyspace_live_data.html"));
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+  }
+
+  @Test
+  @Ignore
+  //TODO(n.a.) activate test when using Java 8 and C* 3.x
+  public void should_describe_function() throws Exception {
+    //Given
+    Properties properties = new Properties();
+    properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
+    properties.setProperty(CASSANDRA_PORT,  "9042");
+    Interpreter interpreter = new CassandraInterpreter(properties);
+    interpreter.open();
+
+    String createFunction = "CREATE FUNCTION zeppelin.maxof(val1 int,val2 int) " +
+            "RETURNS NULL ON NULL INPUT " +
+            "RETURNS int " +
+            "LANGUAGE java " +
+            "AS $$" +
+            "    return Math.max(val1, val2);\n" +
+            "$$;";
+    interpreter.interpret(createFunction, intrContext);
+    String query = "DESCRIBE FUNCTION zeppelin.maxOf;";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(actual.message()).isEqualTo("xxxxx");
+  }
+
+  @Test
+  @Ignore
+  //TODO(n.a.) activate test when using Java 8 and C* 3.x
+  public void should_describe_aggregate() throws Exception {
+    //Given
+    Properties properties = new Properties();
+    properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
+    properties.setProperty(CASSANDRA_PORT,  "9042");
+    Interpreter interpreter = new CassandraInterpreter(properties);
+    interpreter.open();
+
+    final String query = "DESCRIBE AGGREGATES;";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+  }
+
+  @Test
+  @Ignore
+  //TODO(n.a.) activate test when using Java 8 and C* 3.x
+  public void should_describe_materialized_view() throws Exception {
+    //Given
+    Properties properties = new Properties();
+    properties.setProperty(CASSANDRA_HOSTS, "127.0.0.1");
+    properties.setProperty(CASSANDRA_PORT,  "9042");
+    Interpreter interpreter = new CassandraInterpreter(properties);
+    interpreter.open();
+
+    final String query = "DESCRIBE MATERIALIZED VIEWS;";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+  }
+
+  @Test
+  public void should_describe_table() throws Exception {
+    //Given
+    String query = "DESCRIBE TABLE live_data.complex_table;";
+    final String expected = reformatHtml(
+            readTestResource("/scalate/DescribeTable_live_data_complex_table.html"));
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+  }
+
+  @Test
+  public void should_describe_udt() throws Exception {
+    //Given
+    String query = "DESCRIBE TYPE live_data.address;";
+    final String expected = reformatHtml(
+            readTestResource("/scalate/DescribeType_live_data_address.html"));
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+  }
+
+  @Test
+  public void should_describe_udt_withing_logged_in_keyspace() throws Exception {
+    //Given
+    String query = "USE live_data;\n" +
+            "DESCRIBE TYPE address;";
+    final String expected = reformatHtml(readTestResource(
+            "/scalate/DescribeType_live_data_address_within_current_keyspace.html"));
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+  }
+
+  @Test
+  public void should_error_describing_non_existing_table() throws Exception {
+    //Given
+    String query = "USE system;\n" +
+            "DESCRIBE TABLE complex_table;";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.ERROR);
+    assertThat(actual.message().get(0).getData())
+            .contains("Cannot find table system.complex_table");
+  }
+
+  @Test
+  public void should_error_describing_non_existing_udt() throws Exception {
+    //Given
+    String query = "USE system;\n" +
+            "DESCRIBE TYPE address;";
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.ERROR);
+    assertThat(actual.message().get(0).getData()).contains("Cannot find type system.address");
+  }
+
+  @Test
+  public void should_show_help() throws Exception {
+    //Given
+    String query = "HELP;";
+    final String expected = reformatHtml(readTestResource("/scalate/Help.html"));
+
+    //When
+    final InterpreterResult actual = interpreter.interpret(query, intrContext);
+
+    //Then
+    assertThat(actual.code()).isEqualTo(Code.SUCCESS);
+    assertThat(reformatHtml(actual.message().get(0).getData())).isEqualTo(expected);
+  }
+
+  private static String reformatHtml(String rawHtml) {
+    return  rawHtml
+            .replaceAll("\\s*\n\\s*", "")
+            .replaceAll(">\\s+<", "><")
+            .replaceAll("(?s)data-target=\"#[a-f0-9-]+(?:_asCQL|_indices_asCQL)?\"", "")
+            .replaceAll("(?s)id=\"[a-f0-9-]+(?:_asCQL|_indices_asCQL)?\"", "")
+            .trim();
+  }
+
+  private static String readTestResource(String testResource) {
+    StringBuilder builder = new StringBuilder();
+    InputStream stream = testResource.getClass().getResourceAsStream(testResource);
+
+    try (BufferedReader br = new BufferedReader(new InputStreamReader(stream))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        builder.append(line).append("\n");
+      }
+    } catch (Exception ex) {
+      throw  new RuntimeException(ex);
+    }
+
+    return builder.toString();
+  }
+}