You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2022/08/03 08:44:10 UTC
[zeppelin] branch master updated: [ZEPPELIN-5785] Remove ksql interpreter (#4434)
This is an automated email from the ASF dual-hosted git repository.
jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 9b9e9a6057 [ZEPPELIN-5785] Remove ksql interpreter (#4434)
9b9e9a6057 is described below
commit 9b9e9a6057eb56e432818115c4caf64d1254350f
Author: jshjsh06 <35...@users.noreply.github.com>
AuthorDate: Wed Aug 3 17:44:01 2022 +0900
[ZEPPELIN-5785] Remove ksql interpreter (#4434)
Co-authored-by: Jongyoul Lee <jo...@gmail.com>
---
.github/workflows/core.yml | 3 +-
dev/create_release.sh | 3 +-
docs/_includes/themes/zeppelin/_navigation.html | 1 -
docs/index.md | 1 -
docs/interpreter/ksql.md | 78 -------
ksql/README.md | 10 -
ksql/pom.xml | 88 -------
.../apache/zeppelin/ksql/BasicKSQLHttpClient.java | 175 --------------
.../org/apache/zeppelin/ksql/KSQLInterpreter.java | 169 --------------
.../java/org/apache/zeppelin/ksql/KSQLRequest.java | 51 ----
.../org/apache/zeppelin/ksql/KSQLResponse.java | 86 -------
.../org/apache/zeppelin/ksql/KSQLRestService.java | 257 ---------------------
ksql/src/main/resources/interpreter-setting.json | 21 --
.../apache/zeppelin/ksql/KSQLInterpreterTest.java | 171 --------------
ksql/src/test/resources/log4j.properties | 30 ---
pom.xml | 1 -
16 files changed, 3 insertions(+), 1142 deletions(-)
diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index 43d843267c..3ea6a07aa6 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -82,8 +82,7 @@ jobs:
interpreter-test-non-core:
runs-on: ubuntu-20.04
env:
- INTERPRETERS: 'hbase,jdbc,file,flink-cmd,cassandra,elasticsearch,bigquery,alluxio,livy,groovy,java,neo4j,submarine,sparql,mongodb,influxdb,ksql'
-
+ INTERPRETERS: 'hbase,jdbc,file,flink-cmd,cassandra,elasticsearch,bigquery,alluxio,livy,groovy,java,neo4j,submarine,sparql,mongodb,influxdb'
steps:
- name: Checkout
uses: actions/checkout@v2
diff --git a/dev/create_release.sh b/dev/create_release.sh
index faca259cab..028e553650 100755
--- a/dev/create_release.sh
+++ b/dev/create_release.sh
@@ -98,7 +98,8 @@ function make_binary_release() {
git_clone
make_source_package
-make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl !hbase,!jdbc,!file,!flink,!cassandra,!elasticsearch,!bigquery,!alluxio,!livy,!groovy,!java,!neo4j,!submarine,!sparql,!mongodb,!ksql -am"
+make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl !hbase,!jdbc,!file,!flink,!cassandra,!elasticsearch,!bigquery,!alluxio,!livy,!groovy,!java,!neo4j,!submarine,!sparql,!mongodb -am"
+
make_binary_release all "-Pweb-angular -Phadoop-2.6"
# remove non release files and dirs
diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html
index 9fa12a7995..b741c21832 100644
--- a/docs/_includes/themes/zeppelin/_navigation.html
+++ b/docs/_includes/themes/zeppelin/_navigation.html
@@ -150,7 +150,6 @@
<li><a href="{{BASE_PATH}}/interpreter/java.html">Java</a></li>
<li><a href="{{BASE_PATH}}/interpreter/jupyter.html">Jupyter</a></li>
<li><a href="{{BASE_PATH}}/interpreter/kotlin.html">Kotlin</a></li>
- <li><a href="{{BASE_PATH}}/interpreter/ksql.html">KSQL</a></li>
<li><a href="{{BASE_PATH}}/interpreter/livy.html">Livy</a></li>
<li><a href="{{BASE_PATH}}/interpreter/mahout.html">Mahout</a></li>
<li><a href="{{BASE_PATH}}/interpreter/markdown.html">Markdown</a></li>
diff --git a/docs/index.md b/docs/index.md
index 29af54878d..f0cd7355ed 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -148,7 +148,6 @@ limitations under the License.
* [JDBC](./interpreter/jdbc.html)
* [Jupyter](./interpreter/jupyter.html)
* [Kotlin](./interpreter/kotlin.html)
- * [KSQL](./interpreter/ksql.html)
* [Livy](./interpreter/livy.html)
* [Mahout](./interpreter/mahout.html)
* [Markdown](./interpreter/markdown.html)
diff --git a/docs/interpreter/ksql.md b/docs/interpreter/ksql.md
deleted file mode 100644
index 2a308bed4b..0000000000
--- a/docs/interpreter/ksql.md
+++ /dev/null
@@ -1,78 +0,0 @@
----
-layout: page
-title: "KSQL Interpreter for Apache Zeppelin"
-description: "SQL is the streaming SQL engine for Apache Kafka and provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka."
-group: interpreter
----
-<!--
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-{% include JB/setup %}
-
-# KSQL Interpreter for Apache Zeppelin
-
-<div id="toc"></div>
-
-## Overview
-[KSQL](https://www.confluent.io/product/ksql/) is the streaming SQL engine for Apache Kafka®. It provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka,
-
-## Configuration
-<table class="table-configuration">
- <thead>
- <tr>
- <th>Property</th>
- <th>Default</th>
- <th>Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>ksql.url</td>
- <td>http://localhost:8080</td>
- <td>The KSQL Endpoint base URL</td>
- </tr>
- </tbody>
-</table>
-
-N.b. The interpreter supports all the KSQL properties, i.e. `ksql.streams.auto.offset.reset`.
-The full list of KSQL parameters is [here](https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html).
-
-## Using the KSQL Interpreter
-In a paragraph, use `%ksql` and start your SQL query in order to start to interact with KSQL.
-
-Following some examples:
-
-```
-%ksql
-PRINT 'orders';
-```
-
-![PRINT image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.1.png)
-
-```
-%ksql
-CREATE STREAM ORDERS WITH
- (VALUE_FORMAT='AVRO',
- KAFKA_TOPIC ='orders');
-```
-
-![CREATE image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.2.png)
-
-```
-%ksql
-SELECT *
-FROM ORDERS
-LIMIT 10
-```
-
-![LIMIT image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.3.png)
\ No newline at end of file
diff --git a/ksql/README.md b/ksql/README.md
deleted file mode 100644
index 22f89f2847..0000000000
--- a/ksql/README.md
+++ /dev/null
@@ -1,10 +0,0 @@
-# Overview
-KSQL interpreter for Apache Zeppelin
-
-# Connection
-The Interpreter opens a connection with the KSQL REST endpoint.
-
-# Confluent KSQL resources
-Following a list of useful resources:
- * [Docs](https://docs.confluent.io/current/ksql/docs/index.html)
- * [Getting Started](https://github.com/confluentinc/demo-scene/blob/master/ksql-intro/demo_ksql-intro.adoc)
diff --git a/ksql/pom.xml b/ksql/pom.xml
deleted file mode 100644
index 67820ea894..0000000000
--- a/ksql/pom.xml
+++ /dev/null
@@ -1,88 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>zeppelin-interpreter-parent</artifactId>
- <groupId>org.apache.zeppelin</groupId>
- <version>0.11.0-SNAPSHOT</version>
- <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
- </parent>
-
- <artifactId>zeppelin-ksql</artifactId>
- <packaging>jar</packaging>
- <name>Zeppelin: Kafka SQL interpreter</name>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <interpreter.name>ksql</interpreter.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.12.6.1</version>
- </dependency>
-
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.11.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
-
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>3.0.0</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-enforcer-plugin</artifactId>
- </plugin>
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- </plugin>
- <plugin>
- <artifactId>maven-shade-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- <skip>false</skip>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java b/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java
deleted file mode 100644
index 937a5d3570..0000000000
--- a/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.ksql;
-
-import org.apache.commons.io.IOUtils;
-
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class BasicKSQLHttpClient implements Closeable {
-
- interface BasicHTTPClientResponse {
- void onMessage(int status, String message);
-
- void onError(int status, String message);
- }
-
- private final String jsonData;
- private final Map<String, Object> formData;
- private final String type;
- private final Map<String, String> headers;
- private final URL url;
- private HttpURLConnection connection;
- private final int timeout;
- private boolean connected;
-
-
- public BasicKSQLHttpClient(String url, String jsonData, Map<String, Object> formData,
- String type, Map<String, String> headers, int timeout)
- throws IOException {
- this.url = new URL(url);
- this.jsonData = jsonData;
- this.formData = formData;
- this.type = type;
- this.headers = headers;
- this.timeout = timeout;
- this.connected = false;
- }
-
- @Override
- public void close() throws IOException {
- connected = false;
- if (connection != null) {
- connection.disconnect();
- }
- }
-
- private void writeOutput(String data) throws IOException {
- try (OutputStream os = connection.getOutputStream()) {
- byte[] input = data.getBytes(StandardCharsets.UTF_8);
- os.write(input);
- }
- }
-
- public String connect() throws IOException {
- int status = createConnection();
- boolean isStatusOk = isStatusOk(status);
- return IOUtils.toString(isStatusOk ?
- connection.getInputStream() : connection.getErrorStream(), StandardCharsets.UTF_8.name());
- }
-
- public void connectAsync(BasicHTTPClientResponse onResponse) throws IOException {
- int status = createConnection();
- boolean isStatusOk = isStatusOk(status);
- long start = System.currentTimeMillis();
-
- try (InputStreamReader in = new InputStreamReader(connection.getInputStream(),
- StandardCharsets.UTF_8);
- BufferedReader br = new BufferedReader(in)) {
- while (connected && (timeout == -1 || System.currentTimeMillis() - start < timeout)) {
- if (br.ready()) {
- String responseLine = br.readLine();
- if (responseLine == null || responseLine.isEmpty()) {
- continue;
- }
- if (isStatusOk) {
- onResponse.onMessage(status, responseLine.trim());
- } else {
- onResponse.onError(status, responseLine.trim());
- }
- }
- }
- }
- }
-
- private boolean isStatusOk(int status) {
- return status >= 200 && status < 300;
- }
-
- private int createConnection() throws IOException {
- this.connection = (HttpURLConnection) url.openConnection();
- connection.setRequestMethod(this.type);
- this.headers.forEach((k, v) -> connection.setRequestProperty(k, v));
- connection.setDoOutput(true);
- if (jsonData != null && !jsonData.isEmpty()) {
- writeOutput(jsonData);
- } else if (formData != null && !formData.isEmpty()) {
- String queryStringParams = formData.entrySet()
- .stream()
- .map(e -> e.getKey() + "=" + e.getValue())
- .collect(Collectors.joining("&"));
- writeOutput(queryStringParams);
- }
- connected = true;
- return connection.getResponseCode();
- }
-
- static class Builder {
- private String url;
- private String json;
- private Map<String, Object> formData = new HashMap<>();
- private String type;
- private Map<String, String> headers = new HashMap<>();
- private int timeout = -1;
-
- public Builder withTimeout(int timeout) {
- this.timeout = timeout;
- return this;
- }
-
- public Builder withUrl(String url) {
- this.url = url;
- return this;
- }
-
- public Builder withJson(String json) {
- this.json = json;
- return this;
- }
-
- public Builder withType(String type) {
- this.type = type;
- return this;
- }
-
- public Builder withHeader(String header, String value) {
- this.headers.put(header, value);
- return this;
- }
-
- public Builder withFormData(String name, Object value) {
- this.formData.put(name, value);
- return this;
- }
-
- public BasicKSQLHttpClient build() throws IOException {
- return new BasicKSQLHttpClient(url, json, formData, type, headers, timeout);
- }
-
- }
-}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java
deleted file mode 100644
index 92cb8da2f2..0000000000
--- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.zeppelin.ksql;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-
-public class KSQLInterpreter extends Interpreter {
- private static final String NEW_LINE = "\n";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(KSQLInterpreter.class);
- public static final String TABLE_DELIMITER = "\t";
-
- private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
-
- private final KSQLRestService ksqlRestService;
-
- private static final ObjectMapper json = new ObjectMapper();
-
- public KSQLInterpreter(Properties properties) {
- this(properties, new KSQLRestService(properties.entrySet().stream()
- .collect(Collectors.toMap(e -> e.getKey().toString(),
- e -> e.getValue() != null ? e.getValue().toString() : null))));
- }
-
- // VisibleForTesting
- public KSQLInterpreter(Properties properties, KSQLRestService ksqlRestService) {
- super(properties);
- this.ksqlRestService = ksqlRestService;
- }
-
- @Override
- public void open() throws InterpreterException {}
-
- @Override
- public void close() throws InterpreterException {
- ksqlRestService.close();
- }
-
- private String writeValueAsString(Object data) {
- try {
- if (data instanceof Collection || data instanceof Map) {
- return json.writeValueAsString(data);
- }
- if (data instanceof String) {
- return (String) data;
- }
- return String.valueOf(data);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void checkResponseErrors(String message) throws IOException {
- if (StringUtils.isNotBlank(message)) {
- // throw new RuntimeException(message);
- interpreterOutput.getInterpreterOutput().write("%text");
- interpreterOutput.getInterpreterOutput().write(NEW_LINE);
- interpreterOutput.getInterpreterOutput().write(message);
- }
- }
-
- @Override
- public InterpreterResult interpret(String query,
- InterpreterContext context) throws InterpreterException {
- if (StringUtils.isBlank(query)) {
- return new InterpreterResult(InterpreterResult.Code.SUCCESS);
- }
- interpreterOutput.setInterpreterOutput(context.out);
- try {
- interpreterOutput.getInterpreterOutput().flush();
- interpreterOutput.getInterpreterOutput().write("%table");
- interpreterOutput.getInterpreterOutput().write(NEW_LINE);
- Set<String> header = new LinkedHashSet<>();
- executeQuery(context.getParagraphId(), query.trim(), header);
- return new InterpreterResult(InterpreterResult.Code.SUCCESS);
- } catch (IOException e) {
- return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
- }
- }
-
- private void executeQuery(final String paragraphId,
- final String query, Set<String> header) throws IOException {
- AtomicBoolean isFirstLine = new AtomicBoolean(true);
- ksqlRestService
- .executeQuery(paragraphId, query, (resp) -> {
- try {
- if (resp.getRow() == null || resp.getRow().isEmpty()) {
- return;
- }
- if (isFirstLine.get()) {
- isFirstLine.set(false);
- header.addAll(resp.getRow().keySet());
- interpreterOutput.getInterpreterOutput().write(header.stream()
- .collect(Collectors.joining(TABLE_DELIMITER)));
- interpreterOutput.getInterpreterOutput().write(NEW_LINE);
- }
- interpreterOutput.getInterpreterOutput().write(resp.getRow().values().stream()
- .map(this::writeValueAsString)
- .collect(Collectors.joining(TABLE_DELIMITER)));
- interpreterOutput.getInterpreterOutput().write(NEW_LINE);
- checkResponseErrors(resp.getFinalMessage());
- checkResponseErrors(resp.getErrorMessage());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- }
-
- @Override
- public void cancel(InterpreterContext context) throws InterpreterException {
- LOGGER.info("Trying to cancel paragraphId {}", context.getParagraphId());
- try {
- ksqlRestService.closeClient(context.getParagraphId());
- LOGGER.info("Removed");
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public FormType getFormType() throws InterpreterException {
- return FormType.SIMPLE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) throws InterpreterException {
- return 0;
- }
-
- @Override
- public Scheduler getScheduler() {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler(
- KSQLInterpreter.class.getName() + this.hashCode());
- }
-}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java
deleted file mode 100644
index a05a70d3ab..0000000000
--- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.ksql;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-
-public class KSQLRequest {
-
- private static final String EXPLAIN_QUERY = "EXPLAIN %s";
- private final String ksql;
- private final Map<String, String> streamsProperties;
-
- KSQLRequest(final String ksql, final Map<String, String> streamsProperties) {
- String inputQuery = Objects.requireNonNull(ksql, "ksql")
- .replaceAll("[\\n\\t\\r]", " ")
- .trim();
- this.ksql = inputQuery.endsWith(";") ? inputQuery : inputQuery + ";";
- this.streamsProperties = streamsProperties;
- }
-
- KSQLRequest(final String ksql) {
- this(ksql, Collections.emptyMap());
- }
-
- KSQLRequest toExplainRequest() {
- return new KSQLRequest(String.format(EXPLAIN_QUERY, this.ksql), this.streamsProperties);
- }
-
- public String getKsql() {
- return ksql;
- }
-
- public Map<String, String> getStreamsProperties() {
- return streamsProperties;
- }
-}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java
deleted file mode 100644
index 46461351db..0000000000
--- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.ksql;
-
-import java.util.AbstractMap;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collector;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-public class KSQLResponse {
- private final Map<String, Object> row;
- private final String finalMessage;
- private final String errorMessage;
- private final boolean terminal;
-
- private <T, K, U> Collector<T, ?, Map<K, U>>
- toLinkedHashMap(Function<? super T, ? extends K> keyMapper,
- Function<? super T, ? extends U> valueMapper) {
- return Collectors.toMap(
- keyMapper,
- valueMapper,
- (u, v) -> { throw new IllegalStateException(String.format("Duplicate key %s", u)); },
- LinkedHashMap::new);
- }
-
- KSQLResponse(final List<String> fields, final Map<String, Object> row,
- final String finalMessage, final String errorMessage, boolean terminal) {
- List<Object> columns = row == null ? null : (List<Object>) row.getOrDefault("columns",
- Collections.emptyList());
- this.row = row == null ? null : IntStream.range(0, columns.size())
- .mapToObj(index -> new AbstractMap.SimpleEntry<>(fields.get(index),
- columns.get(index)))
- .collect(toLinkedHashMap(e -> e.getKey(), e -> e.getValue()));
- this.finalMessage = finalMessage;
- this.errorMessage = errorMessage;
- this.terminal = terminal;
- }
-
- KSQLResponse(final List<String> fields, final Map<String, Object> resp) {
- this(fields, (Map<String, Object>) resp.get("row"),
- (String) resp.get("finalMessage"),
- (String) resp.get("errorMessage"),
- (boolean) resp.get("terminal"));
- }
-
- KSQLResponse(final Map<String, Object> resp) {
- this.row = resp;
- this.finalMessage = null;
- this.errorMessage = null;
- this.terminal = true;
- }
-
- public Map<String, Object> getRow() {
- return row;
- }
-
- public String getFinalMessage() {
- return finalMessage;
- }
-
- public String getErrorMessage() {
- return errorMessage;
- }
-
- public boolean isTerminal() {
- return terminal;
- }
-}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java
deleted file mode 100644
index 4fcb2de1ed..0000000000
--- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.ksql;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-public class KSQLRestService {
-
- private static final String KSQL_ENDPOINT = "%s/ksql";
- private static final String QUERY_ENDPOINT = "%s/query";
-
- private static final String KSQL_V1_CONTENT_TYPE = "application/vnd.ksql.v1+json; charset=utf-8";
-
- private static final List<String> KSQL_COMMON_FIELDS = Arrays
- .asList("statementText", "warnings", "@type");
- private static final String KSQL_URL = "ksql.url";
-
- private static final ObjectMapper json = new ObjectMapper();
-
- private final String ksqlUrl;
- private final String queryUrl;
- private final String baseUrl;
- private final Map<String, String> streamsProperties;
-
- private final Map<String, BasicKSQLHttpClient> clientCache;
-
- public KSQLRestService(Map<String, String> props) {
- baseUrl = Objects.requireNonNull(props.get(KSQL_URL), KSQL_URL).toString();
- ksqlUrl = String.format(KSQL_ENDPOINT, baseUrl);
- queryUrl = String.format(QUERY_ENDPOINT, baseUrl);
- clientCache = new ConcurrentHashMap<>();
- this.streamsProperties = props.entrySet().stream()
- .filter(e -> e.getKey().startsWith("ksql.") && !e.getKey().equals(KSQL_URL))
- .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
- }
-
-
- public void executeQuery(final String paragraphId, final String query,
- final Consumer<KSQLResponse> callback) throws IOException {
- KSQLRequest request = new KSQLRequest(query, streamsProperties);
- if (isSelect(request)) {
- executeSelect(paragraphId, callback, request);
- } else if (isPrint(request)) {
- executePrint(paragraphId, callback, request);
- } else {
- executeKSQL(paragraphId, callback, request);
- }
- }
-
- private void executeKSQL(String paragraphId, Consumer<KSQLResponse> callback,
- KSQLRequest request) throws IOException {
- try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, ksqlUrl)) {
- List<Map<String, Object>> queryResponse = json.readValue(client.connect(), List.class);
- queryResponse.stream()
- .map(map -> excludeKSQLCommonFields(map))
- .flatMap(map -> map.entrySet().stream()
- .filter(e -> e.getValue() instanceof List)
- .flatMap(e -> ((List<Map<String, Object>>) e.getValue()).stream()))
- .map(KSQLResponse::new)
- .forEach(callback::accept);
- queryResponse.stream()
- .map(map -> excludeKSQLCommonFields(map))
- .flatMap(map -> map.entrySet().stream()
- .filter(e -> e.getValue() instanceof Map)
- .map(e -> (Map<String, Object>) e.getValue()))
- .map(KSQLResponse::new)
- .forEach(callback::accept);
- }
- }
-
- private Map<String, Object> excludeKSQLCommonFields(Map<String, Object> map) {
- return map.entrySet().stream()
- .filter(e -> !KSQL_COMMON_FIELDS.contains(e.getKey()))
- .collect(Collectors
- .toMap(e -> e.getKey(), e -> e.getValue()));
- }
-
- private BasicKSQLHttpClient createNewClient(String paragraphId, KSQLRequest request,
- String url) throws IOException {
- BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder()
- .withUrl(url)
- .withJson(json.writeValueAsString(request))
- .withType("POST")
- .withHeader("Content-type", KSQL_V1_CONTENT_TYPE)
- .build();
- BasicKSQLHttpClient oldClient = clientCache.put(paragraphId, client);
- if (oldClient != null) {
- oldClient.close();
- }
- return client;
- }
-
- private void executeSelect(String paragraphId, Consumer<KSQLResponse> callback,
- KSQLRequest request) throws IOException {
- List<String> fieldNames = getFields(request);
- if (fieldNames.isEmpty()) {
- throw new RuntimeException("Field are empty");
- }
- try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, queryUrl)) {
- client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() {
- @Override
- public void onMessage(int status, String message) {
- try {
- Map<String, Object> queryResponse = json.readValue(message, LinkedHashMap.class);
- KSQLResponse resp = new KSQLResponse(fieldNames, queryResponse);
- callback.accept(resp);
- if (resp.isTerminal() || StringUtils.isNotBlank(resp.getErrorMessage())
- || StringUtils.isNotBlank(resp.getFinalMessage())) {
- client.close();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void onError(int status, String message) {
- try {
- KSQLResponse resp = new KSQLResponse(Collections.singletonMap("error", message));
- callback.accept(resp);
- client.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
- }
-
- private void executePrint(String paragraphId, Consumer<KSQLResponse> callback,
- KSQLRequest request) throws IOException {
- try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, queryUrl)) {
- client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() {
- @Override
- public void onMessage(int status, String message) {
- if (message.toUpperCase().startsWith("FORMAT:")) {
- return;
- }
- List<String> elements = Arrays.asList(message.split(","));
- Map<String, Object> row = new LinkedHashMap<>();
- row.put("timestamp", elements.get(0));
- row.put("offset", elements.get(1));
- row.put("record", String.join("", elements.subList(2, elements.size())));
- KSQLResponse resp = new KSQLResponse(row);
- callback.accept(resp);
- }
-
- @Override
- public void onError(int status, String message) {
- try {
- KSQLResponse resp = new KSQLResponse(Collections.singletonMap("error", message));
- callback.accept(resp);
- client.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
- }
-
- private boolean isSelect(KSQLRequest request) {
- return request.getKsql().toUpperCase().startsWith("SELECT");
- }
-
- private boolean isPrint(KSQLRequest request) {
- return request.getKsql().toUpperCase().startsWith("PRINT");
- }
-
- public void closeClient(final String paragraphId) throws IOException {
- BasicKSQLHttpClient toClose = clientCache.remove(paragraphId);
- if (toClose != null) {
- toClose.close();
- }
- }
-
- private List<String> getFields(KSQLRequest request) throws IOException {
- return getFields(request, false);
- }
-
- private List<String> getFields(KSQLRequest request, boolean tryCoerce) throws IOException {
- if (tryCoerce) {
- /*
- * this because a query like
- * `EXPLAIN SELECT * FROM ORDERS WHERE ADDRESS->STATE = 'New York' LIMIT 10;`
- * fails with the message `Column STATE cannot be resolved`
- * so we try to coerce the field resolution
- */
- String query = request.getKsql()
- .substring(0, request.getKsql().toUpperCase().indexOf("WHERE"));
- request = new KSQLRequest(query, request.getStreamsProperties());
- }
- try (BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder()
- .withUrl(ksqlUrl)
- .withJson(json.writeValueAsString(request.toExplainRequest()))
- .withType("POST")
- .withHeader("Content-type", KSQL_V1_CONTENT_TYPE)
- .build()) {
- List<Map<String, Object>> explainResponseList = json.readValue(client.connect(), List.class);
- Map<String, Object> explainResponse = explainResponseList.get(0);
- Map<String, Object> queryDescription = (Map<String, Object>) explainResponse
- .getOrDefault("queryDescription", Collections.emptyMap());
- List<Map<String, Object>> fields = (List<Map<String, Object>>) queryDescription
- .getOrDefault("fields", Collections.emptyList());
- return fields.stream()
- .map(elem -> elem.getOrDefault("name", "").toString())
- .filter(s -> !s.isEmpty())
- .collect(Collectors.toList());
- } catch (IOException e) {
- if (!tryCoerce) {
- return getFields(request, true);
- } else {
- throw e;
- }
- }
- }
-
-
- public void close() {
- Set<String> keys = clientCache.keySet();
- keys.forEach(key -> {
- try {
- closeClient(key);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- }
-}
diff --git a/ksql/src/main/resources/interpreter-setting.json b/ksql/src/main/resources/interpreter-setting.json
deleted file mode 100644
index cf15bbf2e8..0000000000
--- a/ksql/src/main/resources/interpreter-setting.json
+++ /dev/null
@@ -1,21 +0,0 @@
-[
- {
- "group": "ksql",
- "name": "ksql",
- "className": "org.apache.zeppelin.ksql.KSQLInterpreter",
- "properties": {
- "ksql.url": {
- "envName": null,
- "propertyName": "ksql.url",
- "defaultValue": "http://localhost:8088",
- "description": "KSQL Endpoint base URL",
- "type": "string"
- }
- },
- "editor": {
- "language": "sql",
- "editOnDblClick": false,
- "completionSupport": false
- }
- }
-]
diff --git a/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java b/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java
deleted file mode 100644
index 0fd01fd2a6..0000000000
--- a/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.zeppelin.ksql;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.stubbing.Stubber;
-
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static org.junit.Assert.assertEquals;
-
-public class KSQLInterpreterTest {
-
- private InterpreterContext context;
-
- private static final Map<String, String> PROPS = new HashMap<String, String>() {{
- put("ksql.url", "http://localhost:8088");
- put("ksql.streams.auto.offset.reset", "earliest");
- }};
-
-
- @Before
- public void setUpZeppelin() throws IOException {
- context = InterpreterContext.builder()
- .setInterpreterOut(new InterpreterOutput())
- .setParagraphId("ksql-test")
- .build();
- }
-
- @Test
- public void shouldRenderKSQLSelectAsTable() throws InterpreterException,
- IOException, InterruptedException {
- // given
- Properties p = new Properties();
- p.putAll(PROPS);
- KSQLRestService service = Mockito.mock(KSQLRestService.class);
- Stubber stubber = Mockito.doAnswer((invocation) -> {
- Consumer< KSQLResponse> callback = (Consumer< KSQLResponse>)
- invocation.getArguments()[2];
- IntStream.range(1, 5)
- .forEach(i -> {
- Map<String, Object> map = new HashMap<>();
- if (i == 4) {
- map.put("row", null);
- map.put("terminal", true);
- } else {
- map.put("row", Collections.singletonMap("columns", Arrays.asList("value " + i)));
- map.put("terminal", false);
- }
- callback.accept(new KSQLResponse(Arrays.asList("fieldName"), map));
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- return null;
- });
- stubber.when(service).executeQuery(Mockito.any(String.class),
- Mockito.anyString(),
- Mockito.any(Consumer.class));
- Interpreter interpreter = new KSQLInterpreter(p, service);
-
- // when
- String query = "select * from orders";
- interpreter.interpret(query, context);
-
- // then
- String expected = "%table fieldName\n" +
- "value 1\n" +
- "value 2\n" +
- "value 3\n";
- context.out.flush();
- assertEquals(1, context.out.toInterpreterResultMessage().size());
- assertEquals(expected, context.out.toInterpreterResultMessage().get(0).toString());
- assertEquals(InterpreterResult.Type.TABLE, context.out
- .toInterpreterResultMessage().get(0).getType());
- interpreter.close();
- }
-
- @Test
- public void shouldRenderKSQLNonSelectAsTable() throws InterpreterException,
- IOException, InterruptedException {
- // given
- Properties p = new Properties();
- p.putAll(PROPS);
- KSQLRestService service = Mockito.mock(KSQLRestService.class);
- Map<String, Object> row1 = new HashMap<>();
- row1.put("name", "orders");
- row1.put("registered", "false");
- row1.put("replicaInfo", "[1]");
- row1.put("consumerCount", "0");
- row1.put("consumerGroupCount", "0");
- Map<String, Object> row2 = new HashMap<>();
- row2.put("name", "orders");
- row2.put("registered", "false");
- row2.put("replicaInfo", "[1]");
- row2.put("consumerCount", "0");
- row2.put("consumerGroupCount", "0");
- Stubber stubber = Mockito.doAnswer((invocation) -> {
- Consumer< KSQLResponse> callback = (Consumer< KSQLResponse>)
- invocation.getArguments()[2];
- callback.accept(new KSQLResponse(row1));
- callback.accept(new KSQLResponse(row2));
- return null;
- });
- stubber.when(service).executeQuery(
- Mockito.any(String.class),
- Mockito.anyString(),
- Mockito.any(Consumer.class));
- Interpreter interpreter = new KSQLInterpreter(p, service);
-
- // when
- String query = "show topics";
- interpreter.interpret(query, context);
-
- // then
- List<Map<String, Object>> expected = Arrays.asList(row1, row2);
-
- context.out.flush();
- String[] lines = context.out.toInterpreterResultMessage()
- .get(0).toString()
- .replace("%table ", "")
- .trim()
- .split("\n");
- List<String[]> rows = Stream.of(lines)
- .map(line -> line.split("\t"))
- .collect(Collectors.toList());
- List<Map<String, String>> actual = rows.stream()
- .skip(1)
- .map(row -> IntStream.range(0, row.length)
- .mapToObj(index -> new AbstractMap.SimpleEntry<>(rows.get(0)[index], row[index]))
- .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())))
- .collect(Collectors.toList());
- assertEquals(1, context.out.toInterpreterResultMessage().size());
- assertEquals(expected, actual);
- assertEquals(InterpreterResult.Type.TABLE, context.out
- .toInterpreterResultMessage().get(0).getType());
- }
-}
diff --git a/ksql/src/test/resources/log4j.properties b/ksql/src/test/resources/log4j.properties
deleted file mode 100644
index 4f78acf5ac..0000000000
--- a/ksql/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Direct log messages to stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c:%L - %m%n
-#log4j.appender.stdout.layout.ConversionPattern=
-#%5p [%t] (%F:%L) - %m%n
-#%-4r [%t] %-5p %c %x - %m%n
-#
-
-# Root logger option
-log4j.rootLogger=INFO, stdout
-#log4j.logger.org.apache.zeppelin.interpreter=DEBUG
diff --git a/pom.xml b/pom.xml
index af54768c4d..17c6dbbaec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,6 @@
<module>alluxio</module>
<module>neo4j</module>
<module>java</module>
- <module>ksql</module>
<module>sparql</module>
<module>zeppelin-common</module>
<module>zeppelin-client</module>