You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2022/08/03 08:44:10 UTC

[zeppelin] branch master updated: [ZEPPELIN-5785] Remove ksql interpreter (#4434)

This is an automated email from the ASF dual-hosted git repository.

jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b9e9a6057 [ZEPPELIN-5785] Remove ksql interpreter (#4434)
9b9e9a6057 is described below

commit 9b9e9a6057eb56e432818115c4caf64d1254350f
Author: jshjsh06 <35...@users.noreply.github.com>
AuthorDate: Wed Aug 3 17:44:01 2022 +0900

    [ZEPPELIN-5785] Remove ksql interpreter (#4434)
    
    Co-authored-by: Jongyoul Lee <jo...@gmail.com>
---
 .github/workflows/core.yml                         |   3 +-
 dev/create_release.sh                              |   3 +-
 docs/_includes/themes/zeppelin/_navigation.html    |   1 -
 docs/index.md                                      |   1 -
 docs/interpreter/ksql.md                           |  78 -------
 ksql/README.md                                     |  10 -
 ksql/pom.xml                                       |  88 -------
 .../apache/zeppelin/ksql/BasicKSQLHttpClient.java  | 175 --------------
 .../org/apache/zeppelin/ksql/KSQLInterpreter.java  | 169 --------------
 .../java/org/apache/zeppelin/ksql/KSQLRequest.java |  51 ----
 .../org/apache/zeppelin/ksql/KSQLResponse.java     |  86 -------
 .../org/apache/zeppelin/ksql/KSQLRestService.java  | 257 ---------------------
 ksql/src/main/resources/interpreter-setting.json   |  21 --
 .../apache/zeppelin/ksql/KSQLInterpreterTest.java  | 171 --------------
 ksql/src/test/resources/log4j.properties           |  30 ---
 pom.xml                                            |   1 -
 16 files changed, 3 insertions(+), 1142 deletions(-)

diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index 43d843267c..3ea6a07aa6 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -82,8 +82,7 @@ jobs:
   interpreter-test-non-core:
     runs-on: ubuntu-20.04
     env:
-      INTERPRETERS: 'hbase,jdbc,file,flink-cmd,cassandra,elasticsearch,bigquery,alluxio,livy,groovy,java,neo4j,submarine,sparql,mongodb,influxdb,ksql'
-
+      INTERPRETERS: 'hbase,jdbc,file,flink-cmd,cassandra,elasticsearch,bigquery,alluxio,livy,groovy,java,neo4j,submarine,sparql,mongodb,influxdb'
     steps:
       - name: Checkout
         uses: actions/checkout@v2
diff --git a/dev/create_release.sh b/dev/create_release.sh
index faca259cab..028e553650 100755
--- a/dev/create_release.sh
+++ b/dev/create_release.sh
@@ -98,7 +98,8 @@ function make_binary_release() {
 git_clone
 make_source_package
 
-make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl !hbase,!jdbc,!file,!flink,!cassandra,!elasticsearch,!bigquery,!alluxio,!livy,!groovy,!java,!neo4j,!submarine,!sparql,!mongodb,!ksql -am"
+make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl !hbase,!jdbc,!file,!flink,!cassandra,!elasticsearch,!bigquery,!alluxio,!livy,!groovy,!java,!neo4j,!submarine,!sparql,!mongodb -am"
+
 make_binary_release all "-Pweb-angular -Phadoop-2.6"
 
 # remove non release files and dirs
diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html
index 9fa12a7995..b741c21832 100644
--- a/docs/_includes/themes/zeppelin/_navigation.html
+++ b/docs/_includes/themes/zeppelin/_navigation.html
@@ -150,7 +150,6 @@
                 <li><a href="{{BASE_PATH}}/interpreter/java.html">Java</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/jupyter.html">Jupyter</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/kotlin.html">Kotlin</a></li>
-                <li><a href="{{BASE_PATH}}/interpreter/ksql.html">KSQL</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/livy.html">Livy</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/mahout.html">Mahout</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/markdown.html">Markdown</a></li>
diff --git a/docs/index.md b/docs/index.md
index 29af54878d..f0cd7355ed 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -148,7 +148,6 @@ limitations under the License.
   * [JDBC](./interpreter/jdbc.html)
   * [Jupyter](./interpreter/jupyter.html)
   * [Kotlin](./interpreter/kotlin.html)
-  * [KSQL](./interpreter/ksql.html)
   * [Livy](./interpreter/livy.html)
   * [Mahout](./interpreter/mahout.html)
   * [Markdown](./interpreter/markdown.html)
diff --git a/docs/interpreter/ksql.md b/docs/interpreter/ksql.md
deleted file mode 100644
index 2a308bed4b..0000000000
--- a/docs/interpreter/ksql.md
+++ /dev/null
@@ -1,78 +0,0 @@
----
-layout: page
-title: "KSQL Interpreter for Apache Zeppelin"
-description: "SQL is the streaming SQL engine for Apache Kafka and provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka."
-group: interpreter
----
-<!--
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-{% include JB/setup %}
-
-# KSQL Interpreter for Apache Zeppelin
-
-<div id="toc"></div>
-
-## Overview
-[KSQL](https://www.confluent.io/product/ksql/) is the streaming SQL engine for Apache Kafka®. It provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka,
-
-## Configuration
-<table class="table-configuration">
-  <thead>
-    <tr>
-      <th>Property</th>
-      <th>Default</th>
-      <th>Description</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-      <td>ksql.url</td>
-      <td>http://localhost:8080</td>
-      <td>The KSQL Endpoint base URL</td>
-    </tr>
-  </tbody>
-</table>
-
-N.b. The interpreter supports all the KSQL properties, i.e. `ksql.streams.auto.offset.reset`.
-The full list of KSQL parameters is [here](https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html).
-
-## Using the KSQL Interpreter
-In a paragraph, use `%ksql` and start your SQL query in order to start to interact with KSQL.
-
-Following some examples:
-
-```
-%ksql
-PRINT 'orders';
-```
-
-![PRINT image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.1.png)
-
-```
-%ksql
-CREATE STREAM ORDERS WITH
-  (VALUE_FORMAT='AVRO',
-   KAFKA_TOPIC ='orders');
-```
-
-![CREATE image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.2.png)
-
-```
-%ksql
-SELECT *
-FROM ORDERS
-LIMIT 10
-```
-
-![LIMIT image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.3.png)
\ No newline at end of file
diff --git a/ksql/README.md b/ksql/README.md
deleted file mode 100644
index 22f89f2847..0000000000
--- a/ksql/README.md
+++ /dev/null
@@ -1,10 +0,0 @@
-# Overview
-KSQL interpreter for Apache Zeppelin
-
-# Connection
-The Interpreter opens a connection with the KSQL REST endpoint.
-
-# Confluent KSQL resources
-Following a list of useful resources:
- * [Docs](https://docs.confluent.io/current/ksql/docs/index.html)
- * [Getting Started](https://github.com/confluentinc/demo-scene/blob/master/ksql-intro/demo_ksql-intro.adoc)
diff --git a/ksql/pom.xml b/ksql/pom.xml
deleted file mode 100644
index 67820ea894..0000000000
--- a/ksql/pom.xml
+++ /dev/null
@@ -1,88 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <artifactId>zeppelin-interpreter-parent</artifactId>
-    <groupId>org.apache.zeppelin</groupId>
-    <version>0.11.0-SNAPSHOT</version>
-    <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
-  </parent>
-
-  <artifactId>zeppelin-ksql</artifactId>
-  <packaging>jar</packaging>
-  <name>Zeppelin: Kafka SQL interpreter</name>
-
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <interpreter.name>ksql</interpreter.name>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-      <version>2.12.6.1</version>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-io</groupId>
-      <artifactId>commons-io</artifactId>
-      <version>2.11.0</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-    </dependency>
-
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <version>3.0.0</version>
-      <scope>test</scope>
-    </dependency>
-
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-enforcer-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <artifactId>maven-resources-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <artifactId>maven-shade-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <configuration>
-          <skip>false</skip>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java b/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java
deleted file mode 100644
index 937a5d3570..0000000000
--- a/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.ksql;
-
-import org.apache.commons.io.IOUtils;
-
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class BasicKSQLHttpClient implements Closeable {
-
-  interface BasicHTTPClientResponse {
-    void onMessage(int status, String message);
-
-    void onError(int status, String message);
-  }
-
-  private final String jsonData;
-  private final Map<String, Object> formData;
-  private final String type;
-  private final Map<String, String> headers;
-  private final URL url;
-  private HttpURLConnection connection;
-  private final int timeout;
-  private boolean connected;
-
-
-  public BasicKSQLHttpClient(String url, String jsonData, Map<String, Object> formData,
-                 String type, Map<String, String> headers, int timeout)
-      throws IOException {
-    this.url = new URL(url);
-    this.jsonData = jsonData;
-    this.formData = formData;
-    this.type = type;
-    this.headers = headers;
-    this.timeout = timeout;
-    this.connected = false;
-  }
-
-  @Override
-  public void close() throws IOException {
-    connected = false;
-    if (connection != null) {
-      connection.disconnect();
-    }
-  }
-
-  private void writeOutput(String data) throws IOException {
-    try (OutputStream os = connection.getOutputStream()) {
-      byte[] input = data.getBytes(StandardCharsets.UTF_8);
-      os.write(input);
-    }
-  }
-
-  public String connect() throws IOException {
-    int status = createConnection();
-    boolean isStatusOk = isStatusOk(status);
-    return IOUtils.toString(isStatusOk ?
-        connection.getInputStream() : connection.getErrorStream(), StandardCharsets.UTF_8.name());
-  }
-
-  public void connectAsync(BasicHTTPClientResponse onResponse) throws IOException {
-    int status = createConnection();
-    boolean isStatusOk = isStatusOk(status);
-    long start = System.currentTimeMillis();
-
-    try (InputStreamReader in = new InputStreamReader(connection.getInputStream(),
-            StandardCharsets.UTF_8);
-         BufferedReader br = new BufferedReader(in)) {
-      while (connected && (timeout == -1 || System.currentTimeMillis() - start < timeout)) {
-        if (br.ready()) {
-          String responseLine = br.readLine();
-          if (responseLine == null || responseLine.isEmpty()) {
-            continue;
-          }
-          if (isStatusOk) {
-            onResponse.onMessage(status, responseLine.trim());
-          } else {
-            onResponse.onError(status, responseLine.trim());
-          }
-        }
-      }
-    }
-  }
-
-  private boolean isStatusOk(int status) {
-    return status >= 200 && status < 300;
-  }
-
-  private int createConnection() throws IOException {
-    this.connection = (HttpURLConnection) url.openConnection();
-    connection.setRequestMethod(this.type);
-    this.headers.forEach((k, v) -> connection.setRequestProperty(k, v));
-    connection.setDoOutput(true);
-    if (jsonData != null && !jsonData.isEmpty()) {
-      writeOutput(jsonData);
-    } else if (formData != null && !formData.isEmpty()) {
-      String queryStringParams = formData.entrySet()
-          .stream()
-          .map(e -> e.getKey() + "=" + e.getValue())
-          .collect(Collectors.joining("&"));
-      writeOutput(queryStringParams);
-    }
-    connected = true;
-    return connection.getResponseCode();
-  }
-
-  static class Builder {
-    private String url;
-    private String json;
-    private Map<String, Object> formData = new HashMap<>();
-    private String type;
-    private Map<String, String> headers = new HashMap<>();
-    private int timeout = -1;
-
-    public Builder withTimeout(int timeout) {
-      this.timeout = timeout;
-      return this;
-    }
-
-    public Builder withUrl(String url) {
-      this.url = url;
-      return this;
-    }
-
-    public Builder withJson(String json) {
-      this.json = json;
-      return this;
-    }
-
-    public Builder withType(String type) {
-      this.type = type;
-      return this;
-    }
-
-    public Builder withHeader(String header, String value) {
-      this.headers.put(header, value);
-      return this;
-    }
-
-    public Builder withFormData(String name, Object value) {
-      this.formData.put(name, value);
-      return this;
-    }
-
-    public BasicKSQLHttpClient build() throws IOException {
-      return new BasicKSQLHttpClient(url, json, formData, type, headers, timeout);
-    }
-
-  }
-}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java
deleted file mode 100644
index 92cb8da2f2..0000000000
--- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.zeppelin.ksql;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-
-public class KSQLInterpreter extends Interpreter {
-  private static final String NEW_LINE = "\n";
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(KSQLInterpreter.class);
-  public static final String TABLE_DELIMITER = "\t";
-
-  private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
-
-  private final KSQLRestService ksqlRestService;
-
-  private static final ObjectMapper json = new ObjectMapper();
-
-  public KSQLInterpreter(Properties properties) {
-    this(properties, new KSQLRestService(properties.entrySet().stream()
-            .collect(Collectors.toMap(e -> e.getKey().toString(),
-                e -> e.getValue() != null ? e.getValue().toString() : null))));
-  }
-
-  // VisibleForTesting
-  public KSQLInterpreter(Properties properties, KSQLRestService ksqlRestService) {
-    super(properties);
-    this.ksqlRestService = ksqlRestService;
-  }
-
-  @Override
-  public void open() throws InterpreterException {}
-
-  @Override
-  public void close() throws InterpreterException {
-    ksqlRestService.close();
-  }
-
-  private String writeValueAsString(Object data) {
-    try {
-      if (data instanceof Collection || data instanceof Map) {
-        return json.writeValueAsString(data);
-      }
-      if (data instanceof String) {
-        return (String) data;
-      }
-      return String.valueOf(data);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void checkResponseErrors(String message) throws IOException {
-    if (StringUtils.isNotBlank(message)) {
-      // throw new RuntimeException(message);
-      interpreterOutput.getInterpreterOutput().write("%text");
-      interpreterOutput.getInterpreterOutput().write(NEW_LINE);
-      interpreterOutput.getInterpreterOutput().write(message);
-    }
-  }
-
-  @Override
-  public InterpreterResult interpret(String query,
-        InterpreterContext context) throws InterpreterException {
-    if (StringUtils.isBlank(query)) {
-      return new InterpreterResult(InterpreterResult.Code.SUCCESS);
-    }
-    interpreterOutput.setInterpreterOutput(context.out);
-    try {
-      interpreterOutput.getInterpreterOutput().flush();
-      interpreterOutput.getInterpreterOutput().write("%table");
-      interpreterOutput.getInterpreterOutput().write(NEW_LINE);
-      Set<String> header = new LinkedHashSet<>();
-      executeQuery(context.getParagraphId(), query.trim(), header);
-      return new InterpreterResult(InterpreterResult.Code.SUCCESS);
-    } catch (IOException e) {
-      return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
-    }
-  }
-
-  private void executeQuery(final String paragraphId,
-        final String query, Set<String> header) throws IOException {
-    AtomicBoolean isFirstLine = new AtomicBoolean(true);
-    ksqlRestService
-            .executeQuery(paragraphId, query, (resp) -> {
-              try {
-                if (resp.getRow() == null || resp.getRow().isEmpty()) {
-                  return;
-                }
-                if (isFirstLine.get()) {
-                  isFirstLine.set(false);
-                  header.addAll(resp.getRow().keySet());
-                  interpreterOutput.getInterpreterOutput().write(header.stream()
-                          .collect(Collectors.joining(TABLE_DELIMITER)));
-                  interpreterOutput.getInterpreterOutput().write(NEW_LINE);
-                }
-                interpreterOutput.getInterpreterOutput().write(resp.getRow().values().stream()
-                        .map(this::writeValueAsString)
-                        .collect(Collectors.joining(TABLE_DELIMITER)));
-                interpreterOutput.getInterpreterOutput().write(NEW_LINE);
-                checkResponseErrors(resp.getFinalMessage());
-                checkResponseErrors(resp.getErrorMessage());
-              } catch (IOException e) {
-                throw new RuntimeException(e);
-              }
-            });
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) throws InterpreterException {
-    LOGGER.info("Trying to cancel paragraphId {}", context.getParagraphId());
-    try {
-      ksqlRestService.closeClient(context.getParagraphId());
-      LOGGER.info("Removed");
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public FormType getFormType() throws InterpreterException {
-    return FormType.SIMPLE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) throws InterpreterException {
-    return 0;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
-            KSQLInterpreter.class.getName() + this.hashCode());
-  }
-}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java
deleted file mode 100644
index a05a70d3ab..0000000000
--- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.ksql;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-
-public class KSQLRequest {
-
-  private static final String EXPLAIN_QUERY = "EXPLAIN %s";
-  private final String ksql;
-  private final Map<String, String> streamsProperties;
-
-  KSQLRequest(final String ksql, final Map<String, String> streamsProperties) {
-    String inputQuery = Objects.requireNonNull(ksql, "ksql")
-        .replaceAll("[\\n\\t\\r]", " ")
-        .trim();
-    this.ksql = inputQuery.endsWith(";") ? inputQuery : inputQuery + ";";
-    this.streamsProperties = streamsProperties;
-  }
-
-  KSQLRequest(final String ksql) {
-    this(ksql, Collections.emptyMap());
-  }
-
-  KSQLRequest toExplainRequest() {
-    return new KSQLRequest(String.format(EXPLAIN_QUERY, this.ksql), this.streamsProperties);
-  }
-
-  public String getKsql() {
-    return ksql;
-  }
-
-  public Map<String, String> getStreamsProperties() {
-    return streamsProperties;
-  }
-}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java
deleted file mode 100644
index 46461351db..0000000000
--- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.ksql;
-
-import java.util.AbstractMap;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collector;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-public class KSQLResponse {
-  private final Map<String, Object> row;
-  private final String finalMessage;
-  private final String errorMessage;
-  private final boolean terminal;
-
-  private <T, K, U> Collector<T, ?, Map<K, U>>
-      toLinkedHashMap(Function<? super T, ? extends K> keyMapper,
-        Function<? super T, ? extends U> valueMapper) {
-    return Collectors.toMap(
-        keyMapper,
-        valueMapper,
-        (u, v) -> { throw new IllegalStateException(String.format("Duplicate key %s", u)); },
-        LinkedHashMap::new);
-  }
-
-  KSQLResponse(final List<String> fields, final Map<String, Object> row,
-         final String finalMessage, final String errorMessage, boolean terminal) {
-    List<Object> columns = row == null ? null : (List<Object>) row.getOrDefault("columns",
-        Collections.emptyList());
-    this.row = row == null ? null : IntStream.range(0, columns.size())
-        .mapToObj(index -> new AbstractMap.SimpleEntry<>(fields.get(index),
-            columns.get(index)))
-        .collect(toLinkedHashMap(e -> e.getKey(), e -> e.getValue()));
-    this.finalMessage = finalMessage;
-    this.errorMessage = errorMessage;
-    this.terminal = terminal;
-  }
-
-  KSQLResponse(final List<String> fields, final Map<String, Object> resp) {
-    this(fields, (Map<String, Object>) resp.get("row"),
-        (String) resp.get("finalMessage"),
-        (String) resp.get("errorMessage"),
-        (boolean) resp.get("terminal"));
-  }
-
-  KSQLResponse(final Map<String, Object> resp) {
-    this.row = resp;
-    this.finalMessage = null;
-    this.errorMessage = null;
-    this.terminal = true;
-  }
-
-  public Map<String, Object> getRow() {
-    return row;
-  }
-
-  public String getFinalMessage() {
-    return finalMessage;
-  }
-
-  public String getErrorMessage() {
-    return errorMessage;
-  }
-
-  public boolean isTerminal() {
-    return terminal;
-  }
-}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java
deleted file mode 100644
index 4fcb2de1ed..0000000000
--- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.ksql;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-public class KSQLRestService {
-
-  private static final String KSQL_ENDPOINT = "%s/ksql";
-  private static final String QUERY_ENDPOINT = "%s/query";
-
-  private static final String KSQL_V1_CONTENT_TYPE = "application/vnd.ksql.v1+json; charset=utf-8";
-
-  private static final List<String> KSQL_COMMON_FIELDS = Arrays
-      .asList("statementText", "warnings", "@type");
-  private static final String KSQL_URL = "ksql.url";
-
-  private static final ObjectMapper json = new ObjectMapper();
-
-  private final String ksqlUrl;
-  private final String queryUrl;
-  private final String baseUrl;
-  private final Map<String, String> streamsProperties;
-
-  private final Map<String, BasicKSQLHttpClient> clientCache;
-
-  public KSQLRestService(Map<String, String> props) {
-    baseUrl = Objects.requireNonNull(props.get(KSQL_URL), KSQL_URL).toString();
-    ksqlUrl = String.format(KSQL_ENDPOINT, baseUrl);
-    queryUrl = String.format(QUERY_ENDPOINT, baseUrl);
-    clientCache = new ConcurrentHashMap<>();
-    this.streamsProperties = props.entrySet().stream()
-            .filter(e -> e.getKey().startsWith("ksql.") && !e.getKey().equals(KSQL_URL))
-            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
-  }
-
-
-  public void executeQuery(final String paragraphId, final String query,
-               final Consumer<KSQLResponse> callback) throws IOException {
-    KSQLRequest request = new KSQLRequest(query, streamsProperties);
-    if (isSelect(request)) {
-      executeSelect(paragraphId, callback, request);
-    } else if (isPrint(request)) {
-      executePrint(paragraphId, callback, request);
-    } else {
-      executeKSQL(paragraphId, callback, request);
-    }
-  }
-
-  private void executeKSQL(String paragraphId, Consumer<KSQLResponse> callback,
-        KSQLRequest request) throws IOException {
-    try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, ksqlUrl)) {
-      List<Map<String, Object>> queryResponse = json.readValue(client.connect(), List.class);
-      queryResponse.stream()
-              .map(map -> excludeKSQLCommonFields(map))
-              .flatMap(map -> map.entrySet().stream()
-                  .filter(e -> e.getValue() instanceof List)
-                  .flatMap(e -> ((List<Map<String, Object>>) e.getValue()).stream()))
-              .map(KSQLResponse::new)
-              .forEach(callback::accept);
-      queryResponse.stream()
-              .map(map -> excludeKSQLCommonFields(map))
-              .flatMap(map -> map.entrySet().stream()
-                      .filter(e -> e.getValue() instanceof Map)
-                      .map(e -> (Map<String, Object>) e.getValue()))
-              .map(KSQLResponse::new)
-              .forEach(callback::accept);
-    }
-  }
-
-  private Map<String, Object> excludeKSQLCommonFields(Map<String, Object> map) {
-    return map.entrySet().stream()
-        .filter(e -> !KSQL_COMMON_FIELDS.contains(e.getKey()))
-        .collect(Collectors
-            .toMap(e -> e.getKey(), e -> e.getValue()));
-  }
-
-  private BasicKSQLHttpClient createNewClient(String paragraphId, KSQLRequest request,
-        String url) throws IOException {
-    BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder()
-            .withUrl(url)
-            .withJson(json.writeValueAsString(request))
-            .withType("POST")
-            .withHeader("Content-type", KSQL_V1_CONTENT_TYPE)
-            .build();
-    BasicKSQLHttpClient oldClient = clientCache.put(paragraphId, client);
-    if (oldClient != null) {
-      oldClient.close();
-    }
-    return client;
-  }
-
-  private void executeSelect(String paragraphId, Consumer<KSQLResponse> callback,
-        KSQLRequest request) throws IOException {
-    List<String> fieldNames = getFields(request);
-    if (fieldNames.isEmpty()) {
-      throw new RuntimeException("Field are empty");
-    }
-    try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, queryUrl)) {
-      client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() {
-        @Override
-        public void onMessage(int status, String message) {
-          try {
-            Map<String, Object> queryResponse = json.readValue(message, LinkedHashMap.class);
-            KSQLResponse resp = new KSQLResponse(fieldNames, queryResponse);
-            callback.accept(resp);
-            if (resp.isTerminal() || StringUtils.isNotBlank(resp.getErrorMessage())
-                    || StringUtils.isNotBlank(resp.getFinalMessage())) {
-              client.close();
-            }
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        }
-
-        @Override
-        public void onError(int status, String message) {
-          try {
-            KSQLResponse resp = new KSQLResponse(Collections.singletonMap("error", message));
-            callback.accept(resp);
-            client.close();
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      });
-    }
-  }
-
-  private void executePrint(String paragraphId, Consumer<KSQLResponse> callback,
-                             KSQLRequest request) throws IOException {
-    try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, queryUrl)) {
-      client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() {
-        @Override
-        public void onMessage(int status, String message) {
-          if (message.toUpperCase().startsWith("FORMAT:")) {
-            return;
-          }
-          List<String> elements = Arrays.asList(message.split(","));
-          Map<String, Object> row = new LinkedHashMap<>();
-          row.put("timestamp", elements.get(0));
-          row.put("offset", elements.get(1));
-          row.put("record", String.join("", elements.subList(2, elements.size())));
-          KSQLResponse resp = new KSQLResponse(row);
-          callback.accept(resp);
-        }
-
-        @Override
-        public void onError(int status, String message) {
-          try {
-            KSQLResponse resp = new KSQLResponse(Collections.singletonMap("error", message));
-            callback.accept(resp);
-            client.close();
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      });
-    }
-  }
-
-  private boolean isSelect(KSQLRequest request) {
-    return request.getKsql().toUpperCase().startsWith("SELECT");
-  }
-
-  private boolean isPrint(KSQLRequest request) {
-    return request.getKsql().toUpperCase().startsWith("PRINT");
-  }
-
-  public void closeClient(final String paragraphId) throws IOException {
-    BasicKSQLHttpClient toClose = clientCache.remove(paragraphId);
-    if (toClose != null) {
-      toClose.close();
-    }
-  }
-
-  private List<String> getFields(KSQLRequest request) throws IOException {
-    return getFields(request, false);
-  }
-
-  private List<String> getFields(KSQLRequest request, boolean tryCoerce) throws IOException {
-    if (tryCoerce) {
-      /*
-       * this because a query like
-       * `EXPLAIN SELECT * FROM ORDERS WHERE ADDRESS->STATE = 'New York' LIMIT 10;`
-       * fails with the message `Column STATE cannot be resolved`
-       * so we try to coerce the field resolution
-       */
-      String query = request.getKsql()
-          .substring(0, request.getKsql().toUpperCase().indexOf("WHERE"));
-      request = new KSQLRequest(query, request.getStreamsProperties());
-    }
-    try (BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder()
-        .withUrl(ksqlUrl)
-        .withJson(json.writeValueAsString(request.toExplainRequest()))
-        .withType("POST")
-        .withHeader("Content-type", KSQL_V1_CONTENT_TYPE)
-        .build()) {
-      List<Map<String, Object>> explainResponseList = json.readValue(client.connect(), List.class);
-      Map<String, Object> explainResponse = explainResponseList.get(0);
-      Map<String, Object> queryDescription = (Map<String, Object>) explainResponse
-          .getOrDefault("queryDescription", Collections.emptyMap());
-      List<Map<String, Object>> fields = (List<Map<String, Object>>) queryDescription
-          .getOrDefault("fields", Collections.emptyList());
-      return fields.stream()
-          .map(elem -> elem.getOrDefault("name", "").toString())
-          .filter(s -> !s.isEmpty())
-          .collect(Collectors.toList());
-    } catch (IOException e) {
-      if (!tryCoerce) {
-        return getFields(request, true);
-      } else {
-        throw e;
-      }
-    }
-  }
-
-
-  public void close() {
-    Set<String> keys = clientCache.keySet();
-    keys.forEach(key -> {
-      try {
-        closeClient(key);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    });
-  }
-}
diff --git a/ksql/src/main/resources/interpreter-setting.json b/ksql/src/main/resources/interpreter-setting.json
deleted file mode 100644
index cf15bbf2e8..0000000000
--- a/ksql/src/main/resources/interpreter-setting.json
+++ /dev/null
@@ -1,21 +0,0 @@
-[
-  {
-    "group": "ksql",
-    "name": "ksql",
-    "className": "org.apache.zeppelin.ksql.KSQLInterpreter",
-    "properties": {
-      "ksql.url": {
-        "envName": null,
-        "propertyName": "ksql.url",
-        "defaultValue": "http://localhost:8088",
-        "description": "KSQL Endpoint base URL",
-        "type": "string"
-      }
-    },
-    "editor": {
-      "language": "sql",
-      "editOnDblClick": false,
-      "completionSupport": false
-    }
-  }
-]
diff --git a/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java b/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java
deleted file mode 100644
index 0fd01fd2a6..0000000000
--- a/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.zeppelin.ksql;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.stubbing.Stubber;
-
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static org.junit.Assert.assertEquals;
-
-public class KSQLInterpreterTest {
-
-  private InterpreterContext context;
-
-  private static final Map<String, String> PROPS = new HashMap<String, String>() {{
-      put("ksql.url", "http://localhost:8088");
-      put("ksql.streams.auto.offset.reset", "earliest");
-    }};
-
-
-  @Before
-  public void setUpZeppelin() throws IOException {
-    context = InterpreterContext.builder()
-        .setInterpreterOut(new InterpreterOutput())
-        .setParagraphId("ksql-test")
-        .build();
-  }
-
-  @Test
-  public void shouldRenderKSQLSelectAsTable() throws InterpreterException,
-      IOException, InterruptedException {
-    // given
-    Properties p = new Properties();
-    p.putAll(PROPS);
-    KSQLRestService service = Mockito.mock(KSQLRestService.class);
-    Stubber stubber = Mockito.doAnswer((invocation) -> {
-      Consumer<  KSQLResponse> callback = (Consumer<  KSQLResponse>)
-            invocation.getArguments()[2];
-      IntStream.range(1, 5)
-          .forEach(i -> {
-            Map<String, Object> map = new HashMap<>();
-            if (i == 4) {
-              map.put("row", null);
-              map.put("terminal", true);
-            } else {
-              map.put("row", Collections.singletonMap("columns", Arrays.asList("value " + i)));
-              map.put("terminal", false);
-            }
-            callback.accept(new KSQLResponse(Arrays.asList("fieldName"), map));
-            try {
-              Thread.sleep(3000);
-            } catch (InterruptedException e) {
-              e.printStackTrace();
-            }
-          });
-      return null;
-    });
-    stubber.when(service).executeQuery(Mockito.any(String.class),
-          Mockito.anyString(),
-          Mockito.any(Consumer.class));
-    Interpreter interpreter = new KSQLInterpreter(p, service);
-
-    // when
-    String query = "select * from orders";
-    interpreter.interpret(query, context);
-
-    // then
-    String expected = "%table fieldName\n" +
-        "value 1\n" +
-        "value 2\n" +
-        "value 3\n";
-    context.out.flush();
-    assertEquals(1, context.out.toInterpreterResultMessage().size());
-    assertEquals(expected, context.out.toInterpreterResultMessage().get(0).toString());
-    assertEquals(InterpreterResult.Type.TABLE, context.out
-        .toInterpreterResultMessage().get(0).getType());
-    interpreter.close();
-  }
-
-  @Test
-  public void shouldRenderKSQLNonSelectAsTable() throws InterpreterException,
-      IOException, InterruptedException {
-    // given
-    Properties p = new Properties();
-    p.putAll(PROPS);
-    KSQLRestService service = Mockito.mock(KSQLRestService.class);
-    Map<String, Object> row1 = new HashMap<>();
-    row1.put("name", "orders");
-    row1.put("registered", "false");
-    row1.put("replicaInfo", "[1]");
-    row1.put("consumerCount", "0");
-    row1.put("consumerGroupCount", "0");
-    Map<String, Object> row2 = new HashMap<>();
-    row2.put("name", "orders");
-    row2.put("registered", "false");
-    row2.put("replicaInfo", "[1]");
-    row2.put("consumerCount", "0");
-    row2.put("consumerGroupCount", "0");
-    Stubber stubber = Mockito.doAnswer((invocation) -> {
-      Consumer<  KSQLResponse> callback = (Consumer<  KSQLResponse>)
-            invocation.getArguments()[2];
-      callback.accept(new KSQLResponse(row1));
-      callback.accept(new KSQLResponse(row2));
-      return null;
-    });
-    stubber.when(service).executeQuery(
-        Mockito.any(String.class),
-        Mockito.anyString(),
-        Mockito.any(Consumer.class));
-    Interpreter interpreter = new KSQLInterpreter(p, service);
-
-    // when
-    String query = "show topics";
-    interpreter.interpret(query, context);
-
-    // then
-    List<Map<String, Object>> expected = Arrays.asList(row1, row2);
-
-    context.out.flush();
-    String[] lines = context.out.toInterpreterResultMessage()
-        .get(0).toString()
-        .replace("%table ", "")
-        .trim()
-        .split("\n");
-    List<String[]> rows = Stream.of(lines)
-        .map(line -> line.split("\t"))
-        .collect(Collectors.toList());
-    List<Map<String, String>> actual = rows.stream()
-        .skip(1)
-        .map(row -> IntStream.range(0, row.length)
-            .mapToObj(index -> new AbstractMap.SimpleEntry<>(rows.get(0)[index], row[index]))
-            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())))
-        .collect(Collectors.toList());
-    assertEquals(1, context.out.toInterpreterResultMessage().size());
-    assertEquals(expected, actual);
-    assertEquals(InterpreterResult.Type.TABLE, context.out
-        .toInterpreterResultMessage().get(0).getType());
-  }
-}
diff --git a/ksql/src/test/resources/log4j.properties b/ksql/src/test/resources/log4j.properties
deleted file mode 100644
index 4f78acf5ac..0000000000
--- a/ksql/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Direct log messages to stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c:%L - %m%n
-#log4j.appender.stdout.layout.ConversionPattern=
-#%5p [%t] (%F:%L) - %m%n
-#%-4r [%t] %-5p %c %x - %m%n
-#
-
-# Root logger option
-log4j.rootLogger=INFO, stdout
-#log4j.logger.org.apache.zeppelin.interpreter=DEBUG
diff --git a/pom.xml b/pom.xml
index af54768c4d..17c6dbbaec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,6 @@
     <module>alluxio</module>
     <module>neo4j</module>
     <module>java</module>
-    <module>ksql</module>
     <module>sparql</module>
     <module>zeppelin-common</module>
     <module>zeppelin-client</module>