You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by bz...@apache.org on 2015/08/07 01:20:46 UTC
incubator-zeppelin git commit: ZEPPELIN-189: Add Apache Geode
Interpreter for Zeppelin
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master 9186b2186 -> f04425bc9
ZEPPELIN-189: Add Apache Geode Interpreter for Zeppelin
Syntax look like this:
%geode.oql
SELECT * FROM /regionOne o WHERE o.field1 > 146 AND o.field2 LIKE '%some%';
Author: tzolov <ch...@gmail.com>
Closes #173 from tzolov/ZEPPELIN-189 and squashes the following commits:
c5eef90 [tzolov] ZEPPELIN-189: Close previous connection before opening new one
ab404e3 [tzolov] ZEPPELIN-189: Add GeodeOqlInterpreter to ZeppelinConfiguration's default interpreters list
0ff81c9 [tzolov] ZEPPELIN-189: Add Max number of OQL result to display
437c091 [tzolov] ZEPPELIN-189: Improve javadoc documentation
9b701f9 [tzolov] ZEPPELIN-189: Clean pom formatting and remove obsolete dependencies
c410e2e [tzolov] ZEPPELIN-189: Fix wrong interpreter name in pom
cd6294b [tzolov] ZEPPELIN-189: Handle responses containing reserved characters
14b8a37 [tzolov] ZEPPELIN-189: Add missing license tag
ef7defc [tzolov] ZEPPELIN-189: Add Apache Geode Interpreter for Zeppelin
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f04425bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f04425bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f04425bc
Branch: refs/heads/master
Commit: f04425bc99866c7386533103c403adcfafb08ea9
Parents: 9186b21
Author: tzolov <ch...@gmail.com>
Authored: Thu Aug 6 08:45:17 2015 +0200
Committer: Alexander Bezzubov <bz...@apache.org>
Committed: Fri Aug 7 08:20:21 2015 +0900
----------------------------------------------------------------------
conf/zeppelin-site.xml.template | 2 +-
geode/pom.xml | 151 +++++++++
.../zeppelin/geode/GeodeOqlInterpreter.java | 319 +++++++++++++++++++
.../zeppelin/geode/GeodeOqlInterpreterTest.java | 181 +++++++++++
pom.xml | 1 +
.../zeppelin/conf/ZeppelinConfiguration.java | 3 +-
6 files changed, 655 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 13e4d1d..f48a960 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -72,7 +72,7 @@
<property>
<name>zeppelin.interpreters</name>
- <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter</value>
+ <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/geode/pom.xml
----------------------------------------------------------------------
diff --git a/geode/pom.xml b/geode/pom.xml
new file mode 100644
index 0000000..3fa7cb3
--- /dev/null
+++ b/geode/pom.xml
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zeppelin</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.6.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-geode</artifactId>
+ <packaging>jar</packaging>
+ <version>0.6.0-incubating-SNAPSHOT</version>
+ <name>Zeppelin: Apache Geode interpreter</name>
+ <url>http://geode.incubator.apache.org/</url>
+
+ <properties>
+ <geode.version>1.0.0-incubating-SNAPSHOT</geode.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-interpreter</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.geode</groupId>
+ <artifactId>gemfire-core</artifactId>
+ <version>${geode.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-exec</artifactId>
+ <version>1.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.7</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/../../interpreter/geode</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-artifact</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/../../interpreter/geode</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <includeScope>runtime</includeScope>
+ <artifactItems>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>${project.artifactId}</artifactId>
+ <version>${project.version}</version>
+ <type>${project.packaging}</type>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java
----------------------------------------------------------------------
diff --git a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java
new file mode 100644
index 0000000..6f6b440
--- /dev/null
+++ b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.geode;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.pdx.PdxInstance;
+
+/**
+ * Apache Geode OQL Interpreter (http://geode.incubator.apache.org)
+ *
+ * <ul>
+ * <li>{@code geode.locator.host} - The Geode Locator {@code <HOST>} to connect to.</li>
+ * <li>{@code geode.locator.port} - The Geode Locator {@code <PORT>} to connect to.</li>
+ * <li>{@code geode.max.result} - Max number of OQL result to display.</li>
+ * </ul>
+ * <p>
+ * Sample usages: <br/>
+ * {@code %geode.oql} <br/>
+ * {@code SELECT * FROM /regionEmployee e WHERE e.companyId > 95} <br/>
+ * {@code SELECT * FROM /regionEmployee ORDER BY employeeId} <br/>
+ * {@code
+ * SELECT * FROM /regionEmployee
+ * WHERE companyId IN SET(1, 3, 7) OR lastName IN SET('NameA', 'NameB')
+ * } <br/>
+ * {@code
+ * SELECT e.employeeId, c.id as companyId FROM /regionEmployee e, /regionCompany c
+ * WHERE e.companyId = c.id
+ * }
+ * </p>
+ * <p>
+ * OQL specification and sample queries:
+ * http://geode-docs.cfapps.io/docs/getting_started/querying_quick_reference.html
+ * </p>
+ * <p>
+ * When the Zeppelin server is collocated with Geode Shell (gfsh) one can use the %sh interpreter to
+ * run Geode shell commands: <br/>
+ * {@code
+ * %sh
+ * source /etc/geode/conf/geode-env.sh
+ * gfsh << EOF
+ * connect --locator=ambari.localdomain[10334]
+ * destroy region --name=/regionEmployee
+ * create region --name=regionEmployee --type=REPLICATE
+ * exit;
+ * EOF
+ *}
+ * </p>
+ * <p>
+ * Known issue:http://gemfire.docs.pivotal.io/bugnotes/KnownIssuesGemFire810.html #43673 Using query
+ * "select * from /exampleRegion.entrySet" fails in a client-server topology and/or in a
+ * PartitionedRegion.
+ * </p>
+ */
+public class GeodeOqlInterpreter extends Interpreter {
+
+ private Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class);
+
+ public static final String DEFAULT_PORT = "10334";
+ public static final String DEFAULT_HOST = "localhost";
+ public static final String DEFAULT_MAX_RESULT = "1000";
+
+ private static final char NEWLINE = '\n';
+ private static final char TAB = '\t';
+ private static final char WHITESPACE = ' ';
+
+ private static final String TABLE_MAGIC_TAG = "%table ";
+
+ public static final String LOCATOR_HOST = "geode.locator.host";
+ public static final String LOCATOR_PORT = "geode.locator.port";
+ public static final String MAX_RESULT = "geode.max.result";
+
+ static {
+ Interpreter.register(
+ "oql",
+ "geode",
+ GeodeOqlInterpreter.class.getName(),
+ new InterpreterPropertyBuilder().add(LOCATOR_HOST, DEFAULT_HOST, "The Geode Locator Host.")
+ .add(LOCATOR_PORT, DEFAULT_PORT, "The Geode Locator Port")
+ .add(MAX_RESULT, DEFAULT_MAX_RESULT, "Max number of OQL result to display.").build());
+ }
+
+ private ClientCache clientCache = null;
+ private QueryService queryService = null;
+ private Exception exceptionOnConnect;
+ private int maxResult;
+
+ public GeodeOqlInterpreter(Properties property) {
+ super(property);
+ }
+
+ protected ClientCache getClientCache() {
+
+ String locatorHost = getProperty(LOCATOR_HOST);
+ int locatorPort = Integer.valueOf(getProperty(LOCATOR_PORT));
+
+ ClientCache clientCache =
+ new ClientCacheFactory().addPoolLocator(locatorHost, locatorPort).create();
+
+ return clientCache;
+ }
+
+ @Override
+ public void open() {
+ logger.info("Geode open connection called!");
+
+ // Close the previous open connections.
+ close();
+
+ try {
+ maxResult = Integer.valueOf(getProperty(MAX_RESULT));
+
+ clientCache = getClientCache();
+ queryService = clientCache.getQueryService();
+
+ exceptionOnConnect = null;
+ logger.info("Successfully created Geode connection");
+ } catch (Exception e) {
+ logger.error("Cannot open connection", e);
+ exceptionOnConnect = e;
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (clientCache != null) {
+ clientCache.close();
+ }
+
+ if (queryService != null) {
+ queryService.closeCqs();
+ }
+
+ } catch (Exception e) {
+ logger.error("Cannot close connection", e);
+ } finally {
+ clientCache = null;
+ queryService = null;
+ exceptionOnConnect = null;
+ }
+ }
+
+ private InterpreterResult executeOql(String oql) {
+ try {
+
+ if (getExceptionOnConnect() != null) {
+ return new InterpreterResult(Code.ERROR, getExceptionOnConnect().getMessage());
+ }
+
+ @SuppressWarnings("unchecked")
+ SelectResults<Object> results =
+ (SelectResults<Object>) getQueryService().newQuery(oql).execute();
+
+ StringBuilder msg = new StringBuilder(TABLE_MAGIC_TAG);
+ boolean isTableHeaderSet = false;
+
+ Iterator<Object> iterator = results.iterator();
+ int rowDisplayCount = 0;
+
+ while (iterator.hasNext() && (rowDisplayCount < getMaxResult())) {
+
+ Object entry = iterator.next();
+ rowDisplayCount++;
+
+ if (entry instanceof Number) {
+ handleNumberEntry(isTableHeaderSet, entry, msg);
+ } else if (entry instanceof Struct) {
+ handleStructEntry(isTableHeaderSet, entry, msg);
+ } else if (entry instanceof PdxInstance) {
+ handlePdxInstanceEntry(isTableHeaderSet, entry, msg);
+ } else {
+ handleUnsupportedTypeEntry(isTableHeaderSet, entry, msg);
+ }
+
+ isTableHeaderSet = true;
+ msg.append(NEWLINE);
+ }
+
+ return new InterpreterResult(Code.SUCCESS, msg.toString());
+
+ } catch (Exception ex) {
+ logger.error("Cannot run " + oql, ex);
+ return new InterpreterResult(Code.ERROR, ex.getMessage());
+ }
+ }
+
+ /**
+ * Zeppelin's %TABLE convention uses tab (\t) to delimit fields and new-line (\n) to delimit rows
+ * To complain with this convention we need to replace any occurrences of tab and/or newline
+ * characters in the content.
+ */
+ private String replaceReservedChars(String str) {
+
+ if (StringUtils.isBlank(str)) {
+ return str;
+ }
+
+ return str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE);
+ }
+
+ private void handleStructEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
+ Struct struct = (Struct) entry;
+ if (!isHeaderSet) {
+ for (String titleName : struct.getStructType().getFieldNames()) {
+ msg.append(replaceReservedChars(titleName)).append(TAB);
+ }
+ msg.append(NEWLINE);
+ }
+
+ for (String titleName : struct.getStructType().getFieldNames()) {
+ msg.append(replaceReservedChars("" + struct.get(titleName))).append(TAB);
+ }
+ }
+
+ private void handlePdxInstanceEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
+ PdxInstance pdxEntry = (PdxInstance) entry;
+ if (!isHeaderSet) {
+ for (String titleName : pdxEntry.getFieldNames()) {
+ msg.append(replaceReservedChars(titleName)).append(TAB);
+ }
+ msg.append(NEWLINE);
+ }
+
+ for (String titleName : pdxEntry.getFieldNames()) {
+ msg.append(replaceReservedChars("" + pdxEntry.getField(titleName))).append(TAB);
+ }
+ }
+
+ private void handleNumberEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
+ if (!isHeaderSet) {
+ msg.append("Result").append(NEWLINE);
+ }
+ msg.append((Number) entry);
+ }
+
+ private void handleUnsupportedTypeEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
+ if (!isHeaderSet) {
+ msg.append("Unsuppoted Type").append(NEWLINE);
+ }
+ msg.append("" + entry);
+ }
+
+
+ @Override
+ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
+ logger.info("Run OQL command '{}'", cmd);
+ return executeOql(cmd);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ // Do nothing
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ GeodeOqlInterpreter.class.getName() + this.hashCode());
+ }
+
+ @Override
+ public List<String> completion(String buf, int cursor) {
+ return null;
+ }
+
+ public int getMaxResult() {
+ return maxResult;
+ }
+
+ // Test only
+ QueryService getQueryService() {
+ return this.queryService;
+ }
+
+ Exception getExceptionOnConnect() {
+ return this.exceptionOnConnect;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java
new file mode 100644
index 0000000..78755eb
--- /dev/null
+++ b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.geode;
+
+import static org.apache.zeppelin.geode.GeodeOqlInterpreter.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter.FormType;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.junit.Test;
+
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.cache.query.internal.StructImpl;
+import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
+import com.gemstone.gemfire.pdx.PdxInstance;
+import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl;
+import com.gemstone.gemfire.pdx.internal.PdxType;
+
+public class GeodeOqlInterpreterTest {
+
+ private static final String OQL_QUERY = "select * from /region";
+
+ private static Iterator<Object> asIterator(Object... items) {
+ return new ArrayList<Object>(Arrays.asList(items)).iterator();
+ }
+
+ @Test
+ public void testOpenCommandIndempotency() {
+
+ Properties properties = new Properties();
+ properties.put(LOCATOR_HOST, DEFAULT_HOST);
+ properties.put(LOCATOR_PORT, DEFAULT_PORT);
+ properties.put(MAX_RESULT, DEFAULT_MAX_RESULT);
+
+ GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(properties));
+
+ // Ensure that an attempt to open new connection will clean any remaining connections
+ spyGeodeOqlInterpreter.open();
+ spyGeodeOqlInterpreter.open();
+ spyGeodeOqlInterpreter.open();
+
+ verify(spyGeodeOqlInterpreter, times(3)).open();
+ verify(spyGeodeOqlInterpreter, times(3)).close();
+ }
+
+ @Test
+ public void oqlNumberResponse() throws Exception {
+ testOql(asIterator(66, 67), "Result\n66\n67\n", 10);
+ testOql(asIterator(66, 67), "Result\n66\n", 1);
+ }
+
+ @Test
+ public void oqlStructResponse() throws Exception {
+ String[] fields = new String[] {"field1", "field2"};
+ Struct s1 = new StructImpl(new StructTypeImpl(fields), new String[] {"val11", "val12"});
+ Struct s2 = new StructImpl(new StructTypeImpl(fields), new String[] {"val21", "val22"});
+
+ testOql(asIterator(s1, s2), "field1\tfield2\t\nval11\tval12\t\nval21\tval22\t\n", 10);
+ testOql(asIterator(s1, s2), "field1\tfield2\t\nval11\tval12\t\n", 1);
+ }
+
+ @Test
+ public void oqlStructResponseWithReservedCharacters() throws Exception {
+ String[] fields = new String[] {"fi\teld1", "f\nield2"};
+ Struct s1 = new StructImpl(new StructTypeImpl(fields), new String[] {"v\nal\t1", "val2"});
+
+ testOql(asIterator(s1), "fi eld1\tf ield2\t\nv al 1\tval2\t\n", 10);
+ }
+
+ @Test
+ public void oqlPdxInstanceResponse() throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream("koza\tboza\n".getBytes());
+ PdxInstance pdx1 = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4);
+ PdxInstance pdx2 = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4);
+
+ testOql(asIterator(pdx1, pdx2), "\n\n\n", 10);
+ testOql(asIterator(pdx1, pdx2), "\n\n", 1);
+ }
+
+ private static class DummyUnspportedType {
+ @Override
+ public String toString() {
+ return "Unsupported Indeed";
+ }
+ }
+
+ @Test
+ public void oqlUnsupportedTypeResponse() throws Exception {
+ DummyUnspportedType unspported1 = new DummyUnspportedType();
+ DummyUnspportedType unspported2 = new DummyUnspportedType();
+
+ testOql(asIterator(unspported1, unspported2), "Unsuppoted Type\n" + unspported1.toString()
+ + "\n" + unspported1.toString() + "\n", 10);
+ }
+
+ private void testOql(Iterator<Object> queryResponseIterator, String expectedOutput, int maxResult)
+ throws Exception {
+
+ GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
+
+ QueryService mockQueryService = mock(QueryService.class, RETURNS_DEEP_STUBS);
+
+ when(spyGeodeOqlInterpreter.getQueryService()).thenReturn(mockQueryService);
+ when(spyGeodeOqlInterpreter.getMaxResult()).thenReturn(maxResult);
+
+ @SuppressWarnings("unchecked")
+ SelectResults<Object> mockResults = mock(SelectResults.class);
+
+ when(mockQueryService.newQuery(eq(OQL_QUERY)).execute()).thenReturn(mockResults);
+
+ when(mockResults.iterator()).thenReturn(queryResponseIterator);
+
+ InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null);
+
+ assertEquals(Code.SUCCESS, interpreterResult.code());
+ assertEquals(expectedOutput, interpreterResult.message());
+ }
+
+ @Test
+ public void oqlWithQueryException() throws Exception {
+
+ GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
+
+ when(spyGeodeOqlInterpreter.getExceptionOnConnect()).thenReturn(
+ new RuntimeException("Test Exception On Connect"));
+
+ InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null);
+
+ assertEquals(Code.ERROR, interpreterResult.code());
+ assertEquals("Test Exception On Connect", interpreterResult.message());
+ }
+
+ @Test
+ public void oqlWithExceptionOnConnect() throws Exception {
+
+ GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
+
+ when(spyGeodeOqlInterpreter.getQueryService()).thenThrow(
+ new RuntimeException("Expected Test Exception!"));
+
+ InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null);
+
+ assertEquals(Code.ERROR, interpreterResult.code());
+ assertEquals("Expected Test Exception!", interpreterResult.message());
+ }
+
+ @Test
+ public void testFormType() {
+ assertEquals(FormType.SIMPLE, new GeodeOqlInterpreter(new Properties()).getFormType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9e5f54e..ecdebdd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
<module>angular</module>
<module>shell</module>
<module>hive</module>
+ <module>geode</module>
<module>tajo</module>
<module>flink</module>
<module>ignite</module>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 223dc70..e25d3c0 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -394,7 +394,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.ignite.IgniteInterpreter,"
+ "org.apache.zeppelin.ignite.IgniteSqlInterpreter,"
+ "org.apache.zeppelin.lens.LensInterpreter,"
- + "org.apache.zeppelin.cassandra.CassandraInterpreter"),
+ + "org.apache.zeppelin.cassandra.CassandraInterpreter,"
+ + "org.apache.zeppelin.geode.GeodeOqlInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),