You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/04/24 14:33:13 UTC

[zeppelin] branch master updated: [ZEPPELIN-4764]. Support real time dashboard for jdbc interpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 02749ab  [ZEPPELIN-4764]. Support real time dashboard for jdbc interpreter
02749ab is described below

commit 02749abc2b9c3993ff6c41e6f1ef9246cae4f077
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sun Apr 19 16:07:55 2020 +0800

    [ZEPPELIN-4764]. Support real time dashboard for jdbc interpreter
    
    ### What is this PR for?
    This PR is to support build real time dashboard via jdbc interpreter.  User just need to specify refreshInterval to enable it.  e.g.
    
    ```
    %mysql(refreshInterval=2000)
    
    select * from flink_cdn.cdn_access_statistic;
    ```
    
    ### What type of PR is it?
    [Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4764
    
    ### How should this be tested?
    * Unit test is added, also manually tested.
    
    ### Screenshots (if appropriate)
    ![jdbc_time](https://user-images.githubusercontent.com/164491/80071811-da66e400-8577-11ea-9a34-4efad46e9d51.gif)
    
    ![jdbc_refresh](https://user-images.githubusercontent.com/164491/80072138-66790b80-8578-11ea-936d-29a5b97cd941.gif)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3742 from zjffdu/ZEPPELIN-4764 and squashes the following commits:
    
    d03f9fef5 [Jeff Zhang] [ZEPPELIN-4764]. Support real time dashboard for jdbc interpreter
---
 jdbc/pom.xml                                       | 267 +++++++++++----------
 .../org/apache/zeppelin/jdbc/JDBCInterpreter.java  | 148 +++++++++---
 .../jdbc/JDBCInterpreterInterpolationTest.java     |  63 +++--
 .../apache/zeppelin/jdbc/JDBCInterpreterTest.java  | 253 +++++++++++++------
 jdbc/src/test/resources/log4j.properties           |  24 ++
 5 files changed, 495 insertions(+), 260 deletions(-)

diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 8085645..8e7ab85 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -33,6 +33,144 @@
   <version>0.9.0-SNAPSHOT</version>
   <name>Zeppelin: JDBC interpreter</name>
 
+  <properties>
+    <!--library versions-->
+    <interpreter.name>jdbc</interpreter.name>
+    <postgresql.version>9.4-1201-jdbc41</postgresql.version>
+    <hadoop.common.version>${hadoop2.7.version}</hadoop.common.version>
+    <h2.version>1.4.190</h2.version>
+    <commons.dbcp2.version>2.0.1</commons.dbcp2.version>
+
+    <!--test library versions-->
+    <mockrunner.jdbc.version>1.0.8</mockrunner.jdbc.version>
+  </properties>
+
+  <dependencies>
+
+	<dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+      <version>${postgresql.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <version>${h2.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-dbcp2</artifactId>
+      <version>${commons.dbcp2.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.mockrunner</groupId>
+      <artifactId>mockrunner-jdbc</artifactId>
+      <version>${mockrunner.jdbc.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.common.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+        </exclusion>
+
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.jackrabbit</groupId>
+          <artifactId>jackrabbit-webdav</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-httpclient</groupId>
+          <artifactId>commons-httpclient</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jgit</groupId>
+          <artifactId>org.eclipse.jgit</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.jcraft</groupId>
+          <artifactId>jsch</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    
+    <dependency>
+      <groupId>net.jodah</groupId>
+      <artifactId>concurrentunit</artifactId>
+      <version>0.4.4</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-shade-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <skip>false</skip>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
   <profiles>
     <profile>
       <id>jdbc-hive</id>
@@ -228,133 +366,4 @@
     </profile>
   </profiles>
 
-  <properties>
-    <!--library versions-->
-    <interpreter.name>jdbc</interpreter.name>
-    <postgresql.version>9.4-1201-jdbc41</postgresql.version>
-    <hadoop.common.version>${hadoop2.7.version}</hadoop.common.version>
-    <h2.version>1.4.190</h2.version>
-    <commons.dbcp2.version>2.0.1</commons.dbcp2.version>
-
-    <!--test library versions-->
-    <mockrunner.jdbc.version>1.0.8</mockrunner.jdbc.version>
-  </properties>
-
-  <dependencies>
-
-	<dependency>
-      <groupId>org.postgresql</groupId>
-      <artifactId>postgresql</artifactId>
-      <version>${postgresql.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.h2database</groupId>
-      <artifactId>h2</artifactId>
-      <version>${h2.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-dbcp2</artifactId>
-      <version>${commons.dbcp2.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>com.mockrunner</groupId>
-      <artifactId>mockrunner-jdbc</artifactId>
-      <version>${mockrunner.jdbc.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>${hadoop.common.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>com.sun.jersey</groupId>
-          <artifactId>jersey-core</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.sun.jersey</groupId>
-          <artifactId>jersey-json</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.sun.jersey</groupId>
-          <artifactId>jersey-server</artifactId>
-        </exclusion>
-
-        <exclusion>
-          <groupId>javax.servlet</groupId>
-          <artifactId>servlet-api</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.avro</groupId>
-          <artifactId>avro</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.jackrabbit</groupId>
-          <artifactId>jackrabbit-webdav</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-httpclient</groupId>
-          <artifactId>commons-httpclient</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.eclipse.jgit</groupId>
-          <artifactId>org.eclipse.jgit</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.jcraft</groupId>
-          <artifactId>jsch</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-enforcer-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <artifactId>maven-dependency-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <artifactId>maven-resources-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <artifactId>maven-shade-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <configuration>
-          <skip>false</skip>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
 </project>
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 1ef784d..2cea70c 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -25,7 +25,6 @@ import org.apache.commons.dbcp2.PoolableConnectionFactory;
 import org.apache.commons.dbcp2.PoolingDriver;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.hadoop.conf.Configuration;
@@ -56,7 +55,9 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -159,6 +160,9 @@ public class JDBCInterpreter extends KerberosInterpreter {
 
   private SqlSplitter sqlSplitter;
 
+  private Map<String, ScheduledExecutorService> refreshExecutorServices = new HashMap<>();
+  private Map<String, Boolean> paragraphCancelMap = new HashMap<>();
+
   public JDBCInterpreter(Properties property) {
     super(property);
     jdbcUserConfigurationsMap = new HashMap<>();
@@ -573,9 +577,32 @@ public class JDBCInterpreter extends KerberosInterpreter {
     return null;
   }
 
-  private String getResults(ResultSet resultSet, boolean isTableType, MutableBoolean isComplete)
+  private String getResults(ResultSet resultSet,
+                            boolean isTableType,
+                            String template)
       throws SQLException {
+
     ResultSetMetaData md = resultSet.getMetaData();
+
+    /**
+     * If html template is provided, only fetch the first row.
+     */
+    if (template != null) {
+      resultSet.next();
+      String result = "%html " + template + "\n";
+      for (int i = 1; i <= md.getColumnCount(); ++i) {
+        Object columnObject = resultSet.getObject(i);
+        String columnValue = null;
+        if (columnObject == null) {
+          columnValue = "null";
+        } else {
+          columnValue = resultSet.getString(i);
+        }
+        result = result.replace("{" + (i - 1) + "}", columnValue);
+      }
+      return result;
+    }
+
     StringBuilder msg;
     if (isTableType) {
       msg = new StringBuilder(TABLE_MAGIC_TAG);
@@ -598,9 +625,10 @@ public class JDBCInterpreter extends KerberosInterpreter {
     msg.append(NEWLINE);
 
     int displayRowCount = 0;
+    boolean truncate = false;
     while (resultSet.next()) {
       if (displayRowCount >= getMaxResult()) {
-        isComplete.setValue(false);
+        truncate = true;
         break;
       }
       for (int i = 1; i < md.getColumnCount() + 1; i++) {
@@ -620,6 +648,11 @@ public class JDBCInterpreter extends KerberosInterpreter {
       msg.append(NEWLINE);
       displayRowCount++;
     }
+
+    if (truncate) {
+      msg.append("\n" + ResultMessages.getExceedsLimitRowsMessage(getMaxResult(),
+              String.format("%s.%s", COMMON_KEY, MAX_LINE_KEY)).toString());
+    }
     return msg.toString();
   }
 
@@ -627,7 +660,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
     return updatedCount < 0 && columnCount <= 0 ? true : false;
   }
 
-  public InterpreterResult executePrecode(InterpreterContext interpreterContext) {
+  public InterpreterResult executePrecode(InterpreterContext interpreterContext)
+          throws InterpreterException {
     InterpreterResult interpreterResult = null;
     for (String propertyKey : basePropertiesMap.keySet()) {
       String precode = getProperty(String.format("%s.precode", propertyKey));
@@ -648,12 +682,12 @@ public class JDBCInterpreter extends KerberosInterpreter {
   }
 
   private InterpreterResult executeSql(String propertyKey, String sql,
-      InterpreterContext interpreterContext) {
+      InterpreterContext context) throws InterpreterException {
     Connection connection = null;
     Statement statement;
     ResultSet resultSet = null;
-    String paragraphId = interpreterContext.getParagraphId();
-    String user = interpreterContext.getAuthenticationInfo().getUser();
+    String paragraphId = context.getParagraphId();
+    String user = context.getAuthenticationInfo().getUser();
 
     boolean splitQuery = false;
     String splitQueryProperty = getProperty(String.format("%s.%s", propertyKey, SPLIT_QURIES_KEY));
@@ -661,9 +695,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
       splitQuery = true;
     }
 
-    InterpreterResult interpreterResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
     try {
-      connection = getConnection(propertyKey, interpreterContext);
+      connection = getConnection(propertyKey, context);
     } catch (Exception e) {
       String errorMsg = ExceptionUtils.getStackTrace(e);
       try {
@@ -671,8 +704,12 @@ public class JDBCInterpreter extends KerberosInterpreter {
       } catch (SQLException e1) {
         logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1);
       }
-      interpreterResult.add(errorMsg);
-      return new InterpreterResult(Code.ERROR, interpreterResult.message());
+      try {
+        context.out.write(errorMsg);
+      } catch (IOException ex) {
+        throw new InterpreterException("Fail to write output", ex);
+      }
+      return new InterpreterResult(Code.ERROR);
     }
     if (connection == null) {
       return new InterpreterResult(Code.ERROR, "Prefix not found.");
@@ -695,8 +732,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
         statement = connection.createStatement();
 
         // fetch n+1 rows in order to indicate there's more rows available (for large selects)
-        statement.setFetchSize(interpreterContext.getIntLocalProperty("limit", getMaxResult()));
-        statement.setMaxRows(interpreterContext.getIntLocalProperty("limit", maxRows));
+        statement.setFetchSize(context.getIntLocalProperty("limit", getMaxResult()));
+        statement.setMaxRows(context.getIntLocalProperty("limit", maxRows));
 
         if (statement == null) {
           return new InterpreterResult(Code.ERROR, "Prefix not found.");
@@ -720,22 +757,18 @@ public class JDBCInterpreter extends KerberosInterpreter {
             // Regards that the command is DDL.
             if (isDDLCommand(statement.getUpdateCount(),
                 resultSet.getMetaData().getColumnCount())) {
-              interpreterResult.add(InterpreterResult.Type.TEXT,
-                  "Query executed successfully.");
+              context.out.write("%text Query executed successfully.\n");
             } else {
-              MutableBoolean isComplete = new MutableBoolean(true);
               String results = getResults(resultSet,
-                  !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE), isComplete);
-              interpreterResult.add(results);
-              if (!isComplete.booleanValue()) {
-                interpreterResult.add(ResultMessages.getExceedsLimitRowsMessage(getMaxResult(),
-                    String.format("%s.%s", COMMON_KEY, MAX_LINE_KEY)));
-              }
+                  !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE),
+                      context.getLocalProperties().get("template"));
+              context.out.write(results);
+              context.out.write("\n%text ");
             }
           } else {
             // Response contains either an update count or there are no results.
             int updateCount = statement.getUpdateCount();
-            interpreterResult.add(InterpreterResult.Type.TEXT,
+            context.out.write("\n%text " +
                 "Query executed successfully. Affected rows : " +
                     updateCount);
           }
@@ -754,9 +787,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
       }
     } catch (Throwable e) {
       logger.error("Cannot run " + sql, e);
-      String errorMsg = ExceptionUtils.getStackTrace(e);
-      interpreterResult.add(errorMsg);
-      return new InterpreterResult(Code.ERROR, interpreterResult.message());
+      return new InterpreterResult(Code.ERROR,  ExceptionUtils.getStackTrace(e));
     } finally {
       //In case user ran an insert/update/upsert statement
       if (connection != null) {
@@ -769,7 +800,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
       }
       getJDBCConfiguration(user).removeStatement(paragraphId);
     }
-    return interpreterResult;
+
+    return new InterpreterResult(Code.SUCCESS);
   }
 
   /**
@@ -801,17 +833,71 @@ public class JDBCInterpreter extends KerberosInterpreter {
     return Boolean.parseBoolean(getProperty("zeppelin.jdbc.interpolation", "false"));
   }
 
+  private boolean isRefreshMode(InterpreterContext context) {
+    return context.getLocalProperties().get("refreshInterval") != null;
+  }
+
   @Override
-  public InterpreterResult internalInterpret(String cmd, InterpreterContext contextInterpreter) {
+  public InterpreterResult internalInterpret(String cmd, InterpreterContext context)
+          throws InterpreterException {
     logger.debug("Run SQL command '{}'", cmd);
-    String propertyKey = getPropertyKey(contextInterpreter);
-    cmd = cmd.trim();
+    String propertyKey = getPropertyKey(context);
     logger.debug("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd);
-    return executeSql(propertyKey, cmd, contextInterpreter);
+    if (!isRefreshMode(context)) {
+      return executeSql(propertyKey, cmd.trim(), context);
+    } else {
+      int refreshInterval = Integer.parseInt(context.getLocalProperties().get("refreshInterval"));
+      final String code = cmd.trim();
+      paragraphCancelMap.put(context.getParagraphId(), false);
+      ScheduledExecutorService refreshExecutor = Executors.newSingleThreadScheduledExecutor();
+      refreshExecutorServices.put(context.getParagraphId(), refreshExecutor);
+      final AtomicReference<InterpreterResult> interpreterResultRef = new AtomicReference();
+      refreshExecutor.scheduleAtFixedRate(() -> {
+        context.out.clear(false);
+        try {
+          InterpreterResult result = executeSql(propertyKey, code, context);
+          context.out.flush();
+          interpreterResultRef.set(result);
+          if (result.code() != Code.SUCCESS) {
+            refreshExecutor.shutdownNow();
+          }
+        } catch (Exception e) {
+          logger.warn("Fail to run sql", e);
+        }
+      }, 0, refreshInterval, TimeUnit.MILLISECONDS);
+
+      while (!refreshExecutor.isTerminated()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          logger.error("");
+        }
+      }
+      refreshExecutorServices.remove(context.getParagraphId());
+      if (paragraphCancelMap.getOrDefault(context.getParagraphId(), false)) {
+        return new InterpreterResult(Code.ERROR);
+      } else if (interpreterResultRef.get().code() == Code.ERROR) {
+        return interpreterResultRef.get();
+      } else {
+        return new InterpreterResult(Code.SUCCESS);
+      }
+    }
   }
 
   @Override
   public void cancel(InterpreterContext context) {
+
+    if (isRefreshMode(context)) {
+      logger.info("Shutdown refreshExecutorService for paragraph: " + context.getParagraphId());
+      ScheduledExecutorService executorService =
+              refreshExecutorServices.get(context.getParagraphId());
+      if (executorService != null) {
+        executorService.shutdownNow();
+      }
+      paragraphCancelMap.put(context.getParagraphId(), true);
+      return;
+    }
+
     logger.info("Cancel current query statement.");
     String paragraphId = context.getParagraphId();
     JDBCUserConfigurations jdbcUserConfigurations =
diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterInterpolationTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterInterpolationTest.java
index 1ff246b..0fbc2b5 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterInterpolationTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterInterpolationTest.java
@@ -17,7 +17,9 @@ package org.apache.zeppelin.jdbc;
 import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.user.AuthenticationInfo;
@@ -30,6 +32,7 @@ import java.nio.file.Path;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
+import java.util.List;
 import java.util.Properties;
 
 import static java.lang.String.format;
@@ -70,11 +73,7 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter {
                 "('mou', 'mouse');");
     resourcePool = new LocalResourcePool("JdbcInterpolationTest");
 
-    interpreterContext = InterpreterContext.builder()
-        .setParagraphId("paragraph_1")
-        .setAuthenticationInfo(new AuthenticationInfo("testUser"))
-        .setResourcePool(resourcePool)
-        .build();
+    interpreterContext = getInterpreterContext();
   }
 
   @Test
@@ -97,9 +96,12 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter {
     t.open();
     InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals(1, interpreterResult.message().size());
-    assertEquals("ID\tNAME\n", interpreterResult.message().get(0).getData());
+
+    List<InterpreterResultMessage> resultMessages =
+            interpreterContext.out.toInterpreterResultMessage();
+    assertEquals(1, resultMessages.size());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("ID\tNAME\n", resultMessages.get(0).getData());
 
     //
     // 1 result expected because "zeppelin.jdbc.interpolation" set to "true" ...
@@ -107,12 +109,14 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter {
     properties.setProperty("zeppelin.jdbc.interpolation", "true");
     t = new JDBCInterpreter(properties);
     t.open();
+    interpreterContext = getInterpreterContext();
     interpreterResult = t.interpret(sqlQuery, interpreterContext);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals(1, interpreterResult.message().size());
-    assertEquals("ID\tNAME\nmem\tmemory\n",
-            interpreterResult.message().get(0).getData());
+
+    resultMessages = interpreterContext.out.toInterpreterResultMessage();
+    assertEquals(1, resultMessages.size());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("ID\tNAME\nmem\tmemory\n", resultMessages.get(0).getData());
   }
 
   @Test
@@ -136,9 +140,12 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter {
     String sqlQuery = "select * from test_table where id = '{kbd}'";
     InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals(1, interpreterResult.message().size());
-    assertEquals("ID\tNAME\n", interpreterResult.message().get(0).getData());
+
+    List<InterpreterResultMessage> resultMessages =
+            interpreterContext.out.toInterpreterResultMessage();
+    assertEquals(1, resultMessages.size());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("ID\tNAME\n", resultMessages.get(0).getData());
 
     resourcePool.put("itemId", "key");
 
@@ -146,12 +153,13 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter {
     // 1 result expected because z-variable 'item' is 'key' ...
     //
     sqlQuery = "select * from test_table where id = '{itemId}'";
+    interpreterContext = getInterpreterContext();
     interpreterResult = t.interpret(sqlQuery, interpreterContext);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals(1, interpreterResult.message().size());
-    assertEquals("ID\tNAME\nkey\tkeyboard\n",
-            interpreterResult.message().get(0).getData());
+    resultMessages = interpreterContext.out.toInterpreterResultMessage();
+    assertEquals(1, resultMessages.size());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("ID\tNAME\nkey\tkeyboard\n", resultMessages.get(0).getData());
   }
 
   @Test
@@ -176,10 +184,19 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter {
     String sqlQuery = "select * from test_table where name regexp '[aeiou]{{2}}'";
     InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals(1, interpreterResult.message().size());
-    assertEquals("ID\tNAME\nkey\tkeyboard\nmou\tmouse\n",
-                 interpreterResult.message().get(0).getData());
+    List<InterpreterResultMessage> resultMessages =
+            interpreterContext.out.toInterpreterResultMessage();
+    assertEquals(1, resultMessages.size());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("ID\tNAME\nkey\tkeyboard\nmou\tmouse\n", resultMessages.get(0).getData());
   }
 
+  private InterpreterContext getInterpreterContext() {
+    return InterpreterContext.builder()
+            .setParagraphId("paragraph_1")
+            .setAuthenticationInfo(new AuthenticationInfo("testUser"))
+            .setResourcePool(resourcePool)
+            .setInterpreterOut(new InterpreterOutput(null))
+            .build();
+  }
 }
diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
index d7d7677..79c15d8 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
@@ -30,9 +30,13 @@ import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_URL;
 import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_PRECODE;
 import static org.apache.zeppelin.jdbc.JDBCInterpreter.PRECODE_KEY_TEMPLATE;
 
+import net.jodah.concurrentunit.Waiter;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.junit.Before;
 import org.junit.Test;
 import static org.apache.zeppelin.jdbc.JDBCInterpreter.STATEMENT_PRECODE_KEY_TEMPLATE;
+import static org.junit.Assert.fail;
 
 
 import java.io.IOException;
@@ -41,12 +45,12 @@ import java.nio.file.Path;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeoutException;
 
 import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;
 
@@ -67,7 +71,7 @@ import org.apache.zeppelin.user.UsernamePassword;
  */
 public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
   static String jdbcConnection;
-  InterpreterContext interpreterContext;
+  InterpreterContext context;
 
   private static String getJdbcConnection() throws IOException {
     if (null == jdbcConnection) {
@@ -102,8 +106,10 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
             "insert into test_table(id, name) values ('a', 'a_name'),('b', 'b_name'),('c', ?);");
     insertStatement.setString(1, null);
     insertStatement.execute();
-    interpreterContext = InterpreterContext.builder()
+    context = InterpreterContext.builder()
         .setAuthenticationInfo(new AuthenticationInfo("testUser"))
+        .setParagraphId("paragraphId")
+        .setInterpreterOut(new InterpreterOutput(null))
         .build();
   }
 
@@ -133,7 +139,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
   }
 
   @Test
-  public void testForMapPrefix() throws SQLException, IOException, InterpreterException {
+  public void testForMapPrefix() throws IOException, InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("common.max_count", "1000");
     properties.setProperty("common.max_retry", "3");
@@ -159,7 +165,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
   }
 
   @Test
-  public void testDefaultProperties() throws SQLException {
+  public void testDefaultProperties() {
     JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(getJDBCTestProperties());
 
     assertEquals("org.postgresql.Driver", jdbcInterpreter.getProperty(DEFAULT_DRIVER));
@@ -170,7 +176,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
   }
 
   @Test
-  public void testSelectQuery() throws SQLException, IOException, InterpreterException {
+  public void testSelectQuery() throws IOException, InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("common.max_count", "1000");
     properties.setProperty("common.max_retry", "3");
@@ -183,18 +189,74 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select * from test_table WHERE ID in ('a', 'b'); ";
 
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
 
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message().get(0).getData());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", resultMessages.get(0).getData());
 
-    interpreterContext.getLocalProperties().put("limit", "1");
-    interpreterResult = t.interpret(sqlQuery, interpreterContext);
+    context = getInterpreterContext();
+    context.getLocalProperties().put("limit", "1");
+    interpreterResult = t.interpret(sqlQuery, context);
 
+    resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message().get(0).getData());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("ID\tNAME\na\ta_name\n", resultMessages.get(0).getData());
+  }
+
+  @Test
+  public void testSelectWithRefresh() throws IOException, InterruptedException, TimeoutException {
+    Properties properties = new Properties();
+    properties.setProperty("common.max_count", "1000");
+    properties.setProperty("common.max_retry", "3");
+    properties.setProperty("default.driver", "org.h2.Driver");
+    properties.setProperty("default.url", getJdbcConnection());
+    properties.setProperty("default.user", "");
+    properties.setProperty("default.password", "");
+    JDBCInterpreter t = new JDBCInterpreter(properties);
+    t.open();
+
+    final Waiter waiter = new Waiter();
+    context.getLocalProperties().put("refreshInterval", "1000");
+    Thread thread = new Thread(() -> {
+      String sqlQuery = "select * from test_table WHERE ID in ('a', 'b');";
+      try {
+        InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
+        assertEquals(InterpreterResult.Code.ERROR, interpreterResult.code());
+      } catch (InterpreterException e) {
+        fail("Should not be here");
+      }
+      waiter.resume();
+    });
+
+    thread.start();
+
+    Thread.sleep(5000);
+    t.cancel(context);
+    waiter.await(5000);
+  }
+
+  @Test
+  public void testInvalidSelectWithRefresh() throws IOException, InterpreterException {
+    Properties properties = new Properties();
+    properties.setProperty("common.max_count", "1000");
+    properties.setProperty("common.max_retry", "3");
+    properties.setProperty("default.driver", "org.h2.Driver");
+    properties.setProperty("default.url", getJdbcConnection());
+    properties.setProperty("default.user", "");
+    properties.setProperty("default.password", "");
+    JDBCInterpreter t = new JDBCInterpreter(properties);
+    t.open();
+
+    context.getLocalProperties().put("refreshInterval", "1000");
+    String sqlQuery = "select * from invalid_table;";
+
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
+    assertEquals(InterpreterResult.Code.ERROR, interpreterResult.code());
+    assertTrue(interpreterResult.toString(), interpreterResult.message()
+            .get(0).getData().contains("Table \"INVALID_TABLE\" not found;"));
   }
 
   @Test
@@ -211,15 +273,15 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select NAME as SOME_OTHER_NAME from test_table limit 1";
 
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
-
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals("SOME_OTHER_NAME\na_name\n", interpreterResult.message().get(0).getData());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("SOME_OTHER_NAME\na_name\n", resultMessages.get(0).getData());
   }
 
   @Test
-  public void testSplitSqlQuery() throws SQLException, IOException {
+  public void testSplitSqlQuery() {
     String sqlQuery = "insert into test_table(id, name) values ('a', ';\"');" +
         "select * from test_table;" +
         "select * from test_table WHERE ID = \";'\";" +
@@ -250,7 +312,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
   }
 
   @Test
-  public void testQueryWithEscapedCharacters() throws SQLException, IOException,
+  public void testQueryWithEscapedCharacters() throws IOException,
           InterpreterException {
     String sqlQuery = "select '\\n', ';';" +
         "select replace('A\\;B', '\\', 'text');" +
@@ -268,21 +330,22 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     JDBCInterpreter t = new JDBCInterpreter(properties);
     t.open();
 
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
 
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(1).getType());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(2).getType());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(3).getType());
-    assertEquals("'\\n'\t';'\n\\n\t;\n", interpreterResult.message().get(0).getData());
-    assertEquals("'Atext;B'\nAtext;B\n", interpreterResult.message().get(1).getData());
-    assertEquals("'\\'\t';'\n\\\t;\n", interpreterResult.message().get(2).getData());
-    assertEquals("''''\t';'\n'\t;\n", interpreterResult.message().get(3).getData());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(1).getType());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(2).getType());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(3).getType());
+    assertEquals("'\\n'\t';'\n\\n\t;\n", resultMessages.get(0).getData());
+    assertEquals("'Atext;B'\nAtext;B\n", resultMessages.get(1).getData());
+    assertEquals("'\\'\t';'\n\\\t;\n", resultMessages.get(2).getData());
+    assertEquals("''''\t';'\n'\t;\n", resultMessages.get(3).getData());
   }
 
   @Test
-  public void testSelectMultipleQueries() throws SQLException, IOException, InterpreterException {
+  public void testSelectMultipleQueries() throws IOException, InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("common.max_count", "1000");
     properties.setProperty("common.max_retry", "3");
@@ -296,20 +359,22 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select * from test_table;" +
         "select * from test_table WHERE ID = ';';";
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(2, interpreterResult.message().size());
 
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(2, resultMessages.size());
+
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\nc\tnull\n",
-            interpreterResult.message().get(0).getData());
+            resultMessages.get(0).getData());
 
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(1).getType());
-    assertEquals("ID\tNAME\n", interpreterResult.message().get(1).getData());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(1).getType());
+    assertEquals("ID\tNAME\n", resultMessages.get(1).getData());
   }
 
   @Test
-  public void testDefaultSplitQuries() throws SQLException, IOException, InterpreterException {
+  public void testDefaultSplitQuries() throws IOException, InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("common.max_count", "1000");
     properties.setProperty("common.max_retry", "3");
@@ -322,17 +387,19 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select * from test_table;" +
         "select * from test_table WHERE ID = ';';";
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(1, interpreterResult.message().size());
 
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(1, resultMessages.size());
+
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\nc\tnull\n",
-            interpreterResult.message().get(0).getData());
+            resultMessages.get(0).getData());
   }
 
   @Test
-  public void testSelectQueryWithNull() throws SQLException, IOException, InterpreterException {
+  public void testSelectQueryWithNull() throws IOException, InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("common.max_count", "1000");
     properties.setProperty("common.max_retry", "3");
@@ -345,16 +412,17 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select * from test_table WHERE ID = 'c'";
 
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
 
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals("ID\tNAME\nc\tnull\n", interpreterResult.message().get(0).getData());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("ID\tNAME\nc\tnull\n", resultMessages.get(0).getData());
   }
 
 
   @Test
-  public void testSelectQueryMaxResult() throws SQLException, IOException, InterpreterException {
+  public void testSelectQueryMaxResult() throws IOException, InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("common.max_count", "1");
     properties.setProperty("common.max_retry", "3");
@@ -367,13 +435,15 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select * from test_table";
 
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
 
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message().get(0).getData());
-    assertEquals(InterpreterResult.Type.HTML, interpreterResult.message().get(1).getType());
-    assertTrue(interpreterResult.message().get(1).getData().contains("alert-warning"));
+
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("ID\tNAME\na\ta_name\n", resultMessages.get(0).getData());
+    assertEquals(InterpreterResult.Type.HTML, resultMessages.get(1).getType());
+    assertTrue(resultMessages.get(1).getData().contains("Output is truncated"));
   }
 
   @Test
@@ -400,7 +470,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
   }
 
   @Test
-  public void testAutoCompletion() throws SQLException, IOException, InterpreterException {
+  public void testAutoCompletion() throws IOException, InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("common.max_count", "1000");
     properties.setProperty("common.max_retry", "3");
@@ -411,10 +481,10 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
     jdbcInterpreter.open();
 
-    jdbcInterpreter.interpret("", interpreterContext);
+    jdbcInterpreter.interpret("", context);
 
     List<InterpreterCompletion> completionList = jdbcInterpreter.completion("sel", 3,
-            interpreterContext);
+            context);
 
     InterpreterCompletion correctCompletionKeyword = new InterpreterCompletion("select", "select",
             CompletionType.keyword.name());
@@ -452,7 +522,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
   }
 
   @Test
-  public void testMultiTenant() throws SQLException, IOException, InterpreterException {
+  public void testMultiTenant() throws IOException, InterpreterException {
     /*
      * assume that the database user is 'dbuser' and password is 'dbpassword'
      * 'jdbc1' interpreter has user('dbuser')/password('dbpassword') property
@@ -471,6 +541,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     jdbc1.open();
     InterpreterContext ctx1 = InterpreterContext.builder()
         .setAuthenticationInfo(user1Credential)
+        .setInterpreterOut(new InterpreterOutput(null))
         .setReplName("jdbc1")
         .build();
     jdbc1.interpret("", ctx1);
@@ -497,6 +568,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     jdbc1.open();
     InterpreterContext ctx3 = InterpreterContext.builder()
         .setAuthenticationInfo(user2Credential)
+        .setInterpreterOut(new InterpreterOutput(null))
         .setReplName("jdbc1")
         .build();
     jdbc1.interpret("", ctx3);
@@ -510,6 +582,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     jdbc2.open();
     InterpreterContext ctx4 = InterpreterContext.builder()
         .setAuthenticationInfo(user2Credential)
+        .setInterpreterOut(new InterpreterOutput(null))
         .setReplName("jdbc2")
         .build();
     jdbc2.interpret("", ctx4);
@@ -521,7 +594,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
   }
 
   @Test
-  public void testPrecode() throws SQLException, IOException, InterpreterException {
+  public void testPrecode() throws IOException, InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("default.driver", "org.h2.Driver");
     properties.setProperty("default.url", getJdbcConnection());
@@ -531,19 +604,24 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
             "create table test_precode (id int); insert into test_precode values (1);");
     JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
     jdbcInterpreter.open();
-    jdbcInterpreter.executePrecode(interpreterContext);
+    jdbcInterpreter.executePrecode(context);
 
-    String sqlQuery = "select *from test_precode";
+    String sqlQuery = "select * from test_precode";
 
-    InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext);
+    InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, context);
 
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals("ID\n1\n", interpreterResult.message().get(0).getData());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(2, resultMessages.size());
+    assertEquals(InterpreterResult.Type.TEXT, resultMessages.get(0).getType());
+    assertEquals("Query executed successfully. Affected rows : 0\n",
+            resultMessages.get(0).getData());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(1).getType());
+    assertEquals("ID\n1\n", resultMessages.get(1).getData());
   }
 
   @Test
-  public void testIncorrectPrecode() throws SQLException, IOException {
+  public void testIncorrectPrecode() throws IOException, InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("default.driver", "org.h2.Driver");
     properties.setProperty("default.url", getJdbcConnection());
@@ -557,14 +635,14 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     properties.setProperty(String.format(PRECODE_KEY_TEMPLATE, "incorrect"), "incorrect command");
     JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
     jdbcInterpreter.open();
-    InterpreterResult interpreterResult = jdbcInterpreter.executePrecode(interpreterContext);
+    InterpreterResult interpreterResult = jdbcInterpreter.executePrecode(context);
 
     assertEquals(InterpreterResult.Code.ERROR, interpreterResult.code());
     assertEquals(InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType());
   }
 
   @Test
-  public void testPrecodeWithAnotherPrefix() throws SQLException, IOException,
+  public void testPrecodeWithAnotherPrefix() throws IOException,
           InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("anotherPrefix.driver", "org.h2.Driver");
@@ -580,6 +658,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     localProperties.put("db", "anotherPrefix");
     InterpreterContext context = InterpreterContext.builder()
         .setAuthenticationInfo(new AuthenticationInfo("testUser"))
+        .setInterpreterOut(new InterpreterOutput(null))
         .setLocalProperties(localProperties)
         .build();
     jdbcInterpreter.executePrecode(context);
@@ -588,13 +667,19 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, context);
 
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals("ID\n2\n", interpreterResult.message().get(0).getData());
+
+    assertEquals(2, resultMessages.size());
+    assertEquals(InterpreterResult.Type.TEXT, resultMessages.get(0).getType());
+    assertEquals("Query executed successfully. Affected rows : 0\n",
+            resultMessages.get(0).getData());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(1).getType());
+    assertEquals("ID\n2\n", resultMessages.get(1).getData());
   }
 
   @Test
-  public void testStatementPrecode() throws SQLException, IOException, InterpreterException {
+  public void testStatementPrecode() throws IOException, InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("default.driver", "org.h2.Driver");
     properties.setProperty("default.url", getJdbcConnection());
@@ -606,15 +691,16 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select @v";
 
-    InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext);
+    InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, context);
 
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals("@V\nstatement\n", interpreterResult.message().get(0).getData());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("@V\nstatement\n", resultMessages.get(0).getData());
   }
 
   @Test
-  public void testIncorrectStatementPrecode() throws SQLException, IOException,
+  public void testIncorrectStatementPrecode() throws IOException,
           InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("default.driver", "org.h2.Driver");
@@ -627,14 +713,16 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select 1";
 
-    InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext);
+    InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, context);
 
     assertEquals(InterpreterResult.Code.ERROR, interpreterResult.code());
     assertEquals(InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType());
+    assertTrue(interpreterResult.toString(),
+            interpreterResult.message().get(0).getData().contains("Syntax error"));
   }
 
   @Test
-  public void testStatementPrecodeWithAnotherPrefix() throws SQLException, IOException,
+  public void testStatementPrecodeWithAnotherPrefix() throws IOException,
           InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("anotherPrefix.driver", "org.h2.Driver");
@@ -650,6 +738,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     localProperties.put("db", "anotherPrefix");
     InterpreterContext context = InterpreterContext.builder()
         .setAuthenticationInfo(new AuthenticationInfo("testUser"))
+        .setInterpreterOut(new InterpreterOutput(null))
         .setLocalProperties(localProperties)
         .build();
 
@@ -658,12 +747,13 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
     InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, context);
 
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
-    assertEquals("@V\nstatementAnotherPrefix\n", interpreterResult.message().get(0).getData());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("@V\nstatementAnotherPrefix\n", resultMessages.get(0).getData());
   }
 
   @Test
-  public void testSplitSqlQueryWithComments() throws SQLException, IOException,
+  public void testSplitSqlQueryWithComments() throws IOException,
           InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("common.max_count", "1000");
@@ -687,8 +777,17 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
         "select * from test_table WHERE ID = ';--';\n" +
         "select * from test_table WHERE ID = '/*'; -- test";
 
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, context);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
-    assertEquals(3, interpreterResult.message().size());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(3, resultMessages.size());
+  }
+
+  private InterpreterContext getInterpreterContext() {
+    return InterpreterContext.builder()
+            .setAuthenticationInfo(new AuthenticationInfo("testUser"))
+            .setParagraphId("paragraphId")
+            .setInterpreterOut(new InterpreterOutput(null))
+            .build();
   }
 }
diff --git a/jdbc/src/test/resources/log4j.properties b/jdbc/src/test/resources/log4j.properties
new file mode 100644
index 0000000..4725615
--- /dev/null
+++ b/jdbc/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n