You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/06/21 00:04:12 UTC
incubator-zeppelin git commit: [ZEPPELIN-63] Add Apache Ignite
interpreter
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master c04440d22 -> 1020b7987
[ZEPPELIN-63] Add Apache Ignite interpreter
Author: Andrey Gura <ag...@gridgain.com>
Author: agura <ag...@gridgain.com>
Closes #108 from agura/master and squashes the following commits:
63fde14 [Andrey Gura] Update README.md
43c07b0 [agura] [ZEPPELIN-63] Add Apache Ignite interpreter
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/1020b798
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/1020b798
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/1020b798
Branch: refs/heads/master
Commit: 1020b79871b5b09b9fdaf5876d377d4e3300edfa
Parents: c04440d
Author: Andrey Gura <ag...@gridgain.com>
Authored: Thu Jun 18 20:18:17 2015 +0300
Committer: Lee moon soo <mo...@apache.org>
Committed: Sat Jun 20 15:04:06 2015 -0700
----------------------------------------------------------------------
README.md | 4 +
conf/zeppelin-site.xml.template | 2 +-
ignite/pom.xml | 174 ++++++++++
.../zeppelin/ignite/IgniteInterpreter.java | 343 +++++++++++++++++++
.../zeppelin/ignite/IgniteInterpreterUtils.java | 44 +++
.../zeppelin/ignite/IgniteSqlInterpreter.java | 197 +++++++++++
.../zeppelin/ignite/IgniteInterpreterTest.java | 95 +++++
.../ignite/IgniteSqlInterpreterTest.java | 105 ++++++
.../java/org/apache/zeppelin/ignite/Person.java | 50 +++
pom.xml | 1 +
.../zeppelin/conf/ZeppelinConfiguration.java | 4 +-
11 files changed, 1017 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 4b0475a..fde2b4f 100644
--- a/README.md
+++ b/README.md
@@ -66,6 +66,10 @@ Yarn (Hadoop 2.2.x and later)
```
mvn clean package -Pspark-1.1 -Dhadoop.version=2.2.0 -Phadoop-2.2 -Pyarn -DskipTests
```
+Ignite (1.1.0-incubating and later)
+```
+mvn clean package -Dignite.version=1.1.0-incubating -DskipTests
+```
### Configure
If you wish to configure Zeppelin option (like port number), configure the following files:
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index cd72f12..b467a82 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -66,7 +66,7 @@
<property>
<name>zeppelin.interpreters</name>
- <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter</value>
+ <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/pom.xml
----------------------------------------------------------------------
diff --git a/ignite/pom.xml b/ignite/pom.xml
new file mode 100644
index 0000000..8c14051
--- /dev/null
+++ b/ignite/pom.xml
@@ -0,0 +1,174 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zeppelin</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>zeppelin-ignite</artifactId>
+ <packaging>jar</packaging>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ <name>Zeppelin: Apache Ignite interpreter</name>
+ <url>http://zeppelin.incubator.apache.org</url>
+
+ <properties>
+ <ignite.version>1.1.0-incubating</ignite.version>
+ <ignite.scala.binary.version>2.10</ignite.scala.binary.version>
+ <ignite.scala.version>2.10.4</ignite.scala.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>zeppelin-interpreter</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${ignite.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${ignite.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-indexing</artifactId>
+ <version>${ignite.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-scalar</artifactId>
+ <version>${ignite.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${ignite.scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${ignite.scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${ignite.scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.7</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/../../interpreter/ignite</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-artifact</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/../../interpreter/ignite</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <includeScope>runtime</includeScope>
+ <artifactItems>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>${project.artifactId}</artifactId>
+ <version>${project.version}</version>
+ <type>${project.packaging}</type>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java
----------------------------------------------------------------------
diff --git a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java
new file mode 100644
index 0000000..e09f310
--- /dev/null
+++ b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreter.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.ignite;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Console;
+import scala.None;
+import scala.Some;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.interpreter.IMain;
+import scala.tools.nsc.interpreter.Results.Result;
+import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
+import scala.tools.nsc.settings.MutableSettings.PathSetting;
+
+/**
+ * Apache Ignite interpreter (http://ignite.incubator.apache.org/).
+ *
+ * Use the following properties for interpreter configuration:
+ *
+ * <ul>
+ * <li>{@code ignite.addresses} - coma separated list of hosts in form {@code <host>:<port>}
+ * or {@code <host>:<port_1>..<port_n>} </li>
+ * <li>{@code ignite.clientMode} - indicates that Ignite interpreter
+ * should start node in client mode ({@code true} or {@code false}).</li>
+ * <li>{@code ignite.peerClassLoadingEnabled} - enables/disables peer class loading
+ * ({@code true} or {@code false}).</li>
+ * <li>{@code ignite.config.url} - URL for Ignite configuration. If this URL specified then
+ * all aforementioned properties will not be taken in account.</li>
+ * </ul>
+ */
+public class IgniteInterpreter extends Interpreter {
+ static final String IGNITE_ADDRESSES = "ignite.addresses";
+
+ static final String IGNITE_CLIENT_MODE = "ignite.clientMode";
+
+ static final String IGNITE_PEER_CLASS_LOADING_ENABLED = "ignite.peerClassLoadingEnabled";
+
+ static final String IGNITE_CFG_URL = "ignite.config.url";
+
+ static {
+ Interpreter.register(
+ "ignite",
+ "ignite",
+ IgniteInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add(IGNITE_ADDRESSES, "127.0.0.1:47500..47509",
+ "Coma separated list of addresses "
+ + "(e.g. 127.0.0.1:47500 or 127.0.0.1:47500..47509)")
+ .add(IGNITE_CLIENT_MODE, "true", "Client mode. true or false")
+ .add(IGNITE_CFG_URL, "", "Configuration URL. Overrides all other settings.")
+ .add(IGNITE_PEER_CLASS_LOADING_ENABLED, "true",
+ "Peer class loading enabled. true or false")
+ .build());
+ }
+
+ private Logger logger = LoggerFactory.getLogger(IgniteInterpreter.class);
+ private Ignite ignite;
+ private ByteArrayOutputStream out;
+ private IMain imain;
+ private Throwable initEx;
+
+ public IgniteInterpreter(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ Settings settings = new Settings();
+
+ URL[] urls = getClassloaderUrls();
+
+ // set classpath
+ PathSetting pathSettings = settings.classpath();
+ StringBuilder sb = new StringBuilder();
+
+ for (File f : currentClassPath()) {
+ if (sb.length() > 0) {
+ sb.append(File.pathSeparator);
+ }
+ sb.append(f.getAbsolutePath());
+ }
+
+ if (urls != null) {
+ for (URL u : urls) {
+ if (sb.length() > 0) {
+ sb.append(File.pathSeparator);
+ }
+ sb.append(u.getFile());
+ }
+ }
+
+ pathSettings.v_$eq(sb.toString());
+ settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
+
+ settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread().getContextClassLoader()));
+
+ BooleanSetting b = (BooleanSetting) settings.usejavacp();
+ b.v_$eq(true);
+ settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
+
+ out = new ByteArrayOutputStream();
+ imain = new IMain(settings, new PrintWriter(out));
+
+ initIgnite();
+ }
+
+ private List<File> currentClassPath() {
+ List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
+ String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
+
+ for (String cp : cps) {
+ paths.add(new File(cp));
+ }
+
+ return paths;
+ }
+
+ private List<File> classPath(ClassLoader cl) {
+ List<File> paths = new LinkedList<>();
+
+ if (cl == null) {
+ return paths;
+ }
+
+ if (cl instanceof URLClassLoader) {
+ URLClassLoader ucl = (URLClassLoader) cl;
+ URL[] urls = ucl.getURLs();
+ if (urls != null) {
+ for (URL url : urls) {
+ paths.add(new File(url.getFile()));
+ }
+ }
+ }
+
+ return paths;
+ }
+
+ public Object getValue(String name) {
+ Object val = imain.valueOfTerm(name);
+
+ if (val instanceof None) {
+ return null;
+ } else if (val instanceof Some) {
+ return ((Some) val).get();
+ } else {
+ return val;
+ }
+ }
+
+ private Ignite getIgnite() {
+ if (ignite == null) {
+ try {
+ String cfgUrl = getProperty(IGNITE_CFG_URL);
+
+ if (cfgUrl != null && !cfgUrl.isEmpty()) {
+ ignite = Ignition.start(new URL(cfgUrl));
+ } else {
+ IgniteConfiguration conf = new IgniteConfiguration();
+
+ conf.setClientMode(Boolean.parseBoolean(getProperty(IGNITE_CLIENT_MODE)));
+
+ TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+ ipFinder.setAddresses(getAddresses());
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ discoSpi.setIpFinder(ipFinder);
+ conf.setDiscoverySpi(discoSpi);
+
+ conf.setPeerClassLoadingEnabled(
+ Boolean.parseBoolean(getProperty(IGNITE_PEER_CLASS_LOADING_ENABLED)));
+
+ ignite = Ignition.start(conf);
+ }
+
+ initEx = null;
+ } catch (Exception e) {
+ initEx = e;
+ }
+ }
+ return ignite;
+ }
+
+ private void initIgnite() {
+ imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
+ Map<String, Object> binder = (Map<String, Object>) getValue("_binder");
+
+ if (getIgnite() != null) {
+ binder.put("ignite", ignite);
+
+ imain.interpret("@transient val ignite = "
+ + "_binder.get(\"ignite\")"
+ + ".asInstanceOf[org.apache.ignite.Ignite]");
+ }
+ }
+
+ @Override
+ public void close() {
+ initEx = null;
+
+ if (ignite != null) {
+ ignite.close();
+ ignite = null;
+ }
+
+ if (imain != null) {
+ imain.close();
+ imain = null;
+ }
+ }
+
+ private List<String> getAddresses() {
+ String prop = getProperty(IGNITE_ADDRESSES);
+
+ if (prop == null || prop.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ String[] tokens = prop.split(",");
+ List<String> addresses = new ArrayList<>(tokens.length);
+ Collections.addAll(addresses, tokens);
+
+ return addresses;
+ }
+
+ @Override
+ public InterpreterResult interpret(String line, InterpreterContext context) {
+ if (initEx != null) {
+ return IgniteInterpreterUtils.buildErrorResult(initEx);
+ }
+
+ if (line == null || line.trim().length() == 0) {
+ return new InterpreterResult(Code.SUCCESS);
+ }
+
+ return interpret(line.split("\n"));
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ private InterpreterResult interpret(String[] lines) {
+ String[] linesToRun = new String[lines.length + 1];
+ System.arraycopy(lines, 0, linesToRun, 0, lines.length);
+ linesToRun[lines.length] = "print(\"\")";
+
+ Console.setOut(out);
+ out.reset();
+ Code code = null;
+
+ String incomplete = "";
+ for (String s : linesToRun) {
+ try {
+ code = getResultCode(imain.interpret(incomplete + s));
+ } catch (Exception e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
+ }
+
+ if (code == Code.ERROR) {
+ return new InterpreterResult(code, out.toString());
+ } else if (code == Code.INCOMPLETE) {
+ incomplete += s + '\n';
+ } else {
+ incomplete = "";
+ }
+ }
+
+ if (code == Code.INCOMPLETE) {
+ return new InterpreterResult(code, "Incomplete expression");
+ } else {
+ return new InterpreterResult(code, out.toString());
+ }
+ }
+
+ private Code getResultCode(Result res) {
+ if (res instanceof scala.tools.nsc.interpreter.Results.Success$) {
+ return Code.SUCCESS;
+ } else if (res instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
+ return Code.INCOMPLETE;
+ } else {
+ return Code.ERROR;
+ }
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<String> completion(String buf, int cursor) {
+ return new LinkedList<>();
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ IgniteInterpreter.class.getName() + this.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreterUtils.java
----------------------------------------------------------------------
diff --git a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreterUtils.java b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreterUtils.java
new file mode 100644
index 0000000..74a0a7f
--- /dev/null
+++ b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteInterpreterUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.ignite;
+
+import org.apache.zeppelin.interpreter.InterpreterResult;
+
+/**
+ * Apache Ignite interpreter utils.
+ */
+public class IgniteInterpreterUtils {
+ /**
+ * Builds error result from given exception.
+ * @param e Exception.
+ * @return result.
+ */
+ public static InterpreterResult buildErrorResult(Throwable e) {
+ StringBuilder sb = new StringBuilder(e.getMessage());
+
+ while ((e = e.getCause()) != null) {
+ String errMsg = e.getMessage();
+
+ if (errMsg != null) {
+ sb.append('\n').append(errMsg);
+ }
+ }
+
+ return new InterpreterResult(InterpreterResult.Code.ERROR, sb.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java
----------------------------------------------------------------------
diff --git a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java
new file mode 100644
index 0000000..5d77e7d
--- /dev/null
+++ b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.ignite;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.Statement;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Apache Ignite SQL interpreter (http://ignite.incubator.apache.org/).
+ *
+ * Use {@code ignite.jdbc.url} property to set up JDBC connection URL.
+ * URL has the following pattern:
+ * {@code jdbc:ignite://<hostname>:<port>/<cache_name>}
+ *
+ * <ul>
+ * <li>Hostname is required.</li>
+ * <li>If port is not defined, 11211 is used (default for Ignite client).</li>
+ * <li>Leave cache_name empty if you are connecting to a default cache.
+ * Note that the cache name is case sensitive.</li>
+ * </ul>
+ */
+public class IgniteSqlInterpreter extends Interpreter {
+ private static final String IGNITE_JDBC_DRIVER_NAME = "org.apache.ignite.IgniteJdbcDriver";
+
+ static final String IGNITE_JDBC_URL = "ignite.jdbc.url";
+
+ static {
+ Interpreter.register(
+ "ignitesql",
+ "ignite",
+ IgniteSqlInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add(IGNITE_JDBC_URL, "jdbc:ignite://localhost:11211/", "Ignite JDBC connection URL.")
+ .build());
+ }
+
+ private Logger logger = LoggerFactory.getLogger(IgniteSqlInterpreter.class);
+
+ private Connection conn;
+
+ private Throwable connEx;
+
+ private Statement curStmt;
+
+ public IgniteSqlInterpreter(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() {
+ try {
+ Class.forName(IGNITE_JDBC_DRIVER_NAME);
+ } catch (ClassNotFoundException e) {
+ logger.error("Can't open connection", e);
+ connEx = e;
+ return;
+ }
+
+ try {
+ logger.info("connect to " + getProperty(IGNITE_JDBC_URL));
+
+ conn = DriverManager.getConnection(getProperty(IGNITE_JDBC_URL));
+ connEx = null;
+
+ logger.info("Successfully created JDBC connection");
+ } catch (SQLException e) {
+ logger.error("Can't open connection: ", e);
+ connEx = e;
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (conn != null) {
+ conn.close();
+ }
+ } catch (SQLException e) {
+ throw new InterpreterException(e);
+ } finally {
+ conn = null;
+ connEx = null;
+ }
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ if (connEx != null) {
+ return new InterpreterResult(Code.ERROR, connEx.getMessage());
+ }
+
+ StringBuilder msg = new StringBuilder("%table ");
+
+ try (Statement stmt = conn.createStatement()) {
+
+ curStmt = stmt;
+
+ try (ResultSet res = stmt.executeQuery(st)) {
+ ResultSetMetaData md = res.getMetaData();
+
+ for (int i = 1; i <= md.getColumnCount(); i++) {
+ if (i > 1) {
+ msg.append('\t');
+ }
+
+ msg.append(md.getColumnName(i));
+ }
+
+ msg.append('\n');
+
+ while (res.next()) {
+ for (int i = 1; i <= md.getColumnCount(); i++) {
+ msg.append(res.getString(i));
+
+ if (i != md.getColumnCount()) {
+ msg.append('\t');
+ }
+ }
+
+ msg.append('\n');
+ }
+ }
+ } catch (Exception e) {
+ return IgniteInterpreterUtils.buildErrorResult(e);
+ } finally {
+ curStmt = null;
+ }
+
+ return new InterpreterResult(Code.SUCCESS, msg.toString());
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ if (curStmt != null) {
+ try {
+ curStmt.cancel();
+ } catch (SQLException e) {
+ // No-op.
+ } finally {
+ curStmt = null;
+ }
+ }
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ IgniteSqlInterpreter.class.getName() + this.hashCode());
+ }
+
+ @Override
+ public List<String> completion(String buf, int cursor) {
+ return new LinkedList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
new file mode 100644
index 0000000..3d3f50b
--- /dev/null
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.ignite;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for Apache Ignite interpreter ({@link IgniteInterpreter}).
+ */
+public class IgniteInterpreterTest {
+ private static final String HOST = "127.0.0.1:47500..47509";
+
+ private static final InterpreterContext INTP_CONTEXT =
+ new InterpreterContext(null, null, null, null, null, null, null);
+
+ private IgniteInterpreter intp;
+ private Ignite ignite;
+
+ @Before
+ public void setUp() {
+ TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+ ipFinder.setAddresses(Collections.singletonList(HOST));
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ discoSpi.setIpFinder(ipFinder);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setGridName("test");
+
+ ignite = Ignition.start(cfg);
+
+ Properties props = new Properties();
+ props.setProperty(IgniteSqlInterpreter.IGNITE_JDBC_URL, "jdbc:intp://localhost:11211/person");
+ props.setProperty(IgniteInterpreter.IGNITE_CLIENT_MODE, "false");
+ props.setProperty(IgniteInterpreter.IGNITE_PEER_CLASS_LOADING_ENABLED, "false");
+
+ intp = new IgniteInterpreter(props);
+ intp.open();
+ }
+
+ @After
+ public void tearDown() {
+ ignite.close();
+ intp.close();
+ }
+
+ @Test
+ public void testInterpret() {
+ String sizeVal = "size";
+
+ InterpreterResult result = intp.interpret("import org.apache.ignite.IgniteCache\n" +
+ "val " + sizeVal + " = ignite.cluster().nodes().size()", INTP_CONTEXT);
+
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(result.message().contains(sizeVal + ": Int = " + ignite.cluster().nodes().size()));
+ }
+
+ @Test
+ public void testInterpretInvalidInput() {
+ InterpreterResult result = intp.interpret("invalid input", INTP_CONTEXT);
+
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
new file mode 100644
index 0000000..de1e760
--- /dev/null
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.ignite;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for Apache Ignite SQL interpreter ({@link IgniteSqlInterpreter}).
+ */
+public class IgniteSqlInterpreterTest {
+ private static final String HOST = "127.0.0.1:47500..47509";
+
+ private static final InterpreterContext INTP_CONTEXT =
+ new InterpreterContext(null, null, null, null, null, null, null);
+
+ private Ignite ignite;
+ private IgniteSqlInterpreter intp;
+
+ @Before
+ public void setUp() {
+ TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+ ipFinder.setAddresses(Collections.singletonList(HOST));
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ discoSpi.setIpFinder(ipFinder);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setGridName("test");
+
+ ignite = Ignition.start(cfg);
+
+ Properties props = new Properties();
+ props.setProperty(IgniteSqlInterpreter.IGNITE_JDBC_URL, "jdbc:ignite://localhost:11211/person");
+ props.setProperty(IgniteInterpreter.IGNITE_CLIENT_MODE, "false");
+
+ intp = new IgniteSqlInterpreter(props);
+
+ CacheConfiguration<Integer, Person> cacheConf = new CacheConfiguration<>();
+ cacheConf.setIndexedTypes(Integer.class, Person.class);
+ cacheConf.setName("person");
+
+ IgniteCache<Integer, Person> cache = ignite.createCache(cacheConf);
+ cache.put(1, new Person("sun", 100));
+ cache.put(2, new Person("moon", 50));
+ assertEquals("moon", cache.get(2).getName());
+
+ intp.open();
+ }
+
+ @After
+ public void tearDown() {
+ intp.close();
+ ignite.close();
+ }
+
+ @Test
+ public void testSql() {
+ InterpreterResult result = intp.interpret("select name, age from person where age > 10", INTP_CONTEXT);
+
+ assertEquals(Code.SUCCESS, result.code());
+ assertEquals(Type.TABLE, result.type());
+ assertEquals("NAME\tAGE\nsun\t100\nmoon\t50\n", result.message());
+ }
+
+ @Test
+ public void testInvalidSql() throws Exception {
+ InterpreterResult result = intp.interpret("select * hrom person", INTP_CONTEXT);
+
+ assertEquals(Code.ERROR, result.code());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/ignite/src/test/java/org/apache/zeppelin/ignite/Person.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/Person.java b/ignite/src/test/java/org/apache/zeppelin/ignite/Person.java
new file mode 100644
index 0000000..e0b4367
--- /dev/null
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/Person.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.ignite;
+
+import java.io.Serializable;
+
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+
+public class Person implements Serializable {
+ @QuerySqlField
+ private String name;
+
+ @QuerySqlField
+ private int age;
+
+ public Person(String name, int age) {
+ this.name = name;
+ this.age = age;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e3a7ca0..ecd819f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,7 @@
<module>hive</module>
<module>tajo</module>
<module>flink</module>
+ <module>ignite</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-distribution</module>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/1020b798/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 95d77b7..6bc8a6c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -397,7 +397,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.shell.ShellInterpreter,"
+ "org.apache.zeppelin.hive.HiveInterpreter,"
+ "org.apache.zeppelin.tajo.TajoInterpreter,"
- + "org.apache.zeppelin.flink.FlinkInterpreter"),
+ + "org.apache.zeppelin.flink.FlinkInterpreter,"
+ + "org.apache.zeppelin.ignite.IgniteInterpreter,"
+ + "org.apache.zeppelin.ignite.IgniteSqlInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),