You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by bz...@apache.org on 2015/08/07 01:20:46 UTC

incubator-zeppelin git commit: ZEPPELIN-189: Add Apache Geode Interpreter for Zeppelin

Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 9186b2186 -> f04425bc9


ZEPPELIN-189: Add Apache Geode Interpreter for Zeppelin

Syntax look like this:
%geode.oql
SELECT * FROM /regionOne o WHERE o.field1 > 146 AND o.field2 LIKE '%some%';

Author: tzolov <ch...@gmail.com>

Closes #173 from tzolov/ZEPPELIN-189 and squashes the following commits:

c5eef90 [tzolov] ZEPPELIN-189: Close previous connection before opening new one
ab404e3 [tzolov] ZEPPELIN-189: Add GeodeOqlInterpreter to ZeppelinConfiguration's default interpreters list
0ff81c9 [tzolov] ZEPPELIN-189: Add Max number of OQL result to display
437c091 [tzolov] ZEPPELIN-189: Improve javadoc documentation
9b701f9 [tzolov] ZEPPELIN-189: Clean pom formatting and remove obsolete dependencies
c410e2e [tzolov] ZEPPELIN-189: Fix wrong interpreter name in pom
cd6294b [tzolov] ZEPPELIN-189: Handle responses containing reserved characters
14b8a37 [tzolov] ZEPPELIN-189: Add missing license tag
ef7defc [tzolov] ZEPPELIN-189: Add Apache Geode Interpreter for Zeppelin


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f04425bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f04425bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f04425bc

Branch: refs/heads/master
Commit: f04425bc99866c7386533103c403adcfafb08ea9
Parents: 9186b21
Author: tzolov <ch...@gmail.com>
Authored: Thu Aug 6 08:45:17 2015 +0200
Committer: Alexander Bezzubov <bz...@apache.org>
Committed: Fri Aug 7 08:20:21 2015 +0900

----------------------------------------------------------------------
 conf/zeppelin-site.xml.template                 |   2 +-
 geode/pom.xml                                   | 151 +++++++++
 .../zeppelin/geode/GeodeOqlInterpreter.java     | 319 +++++++++++++++++++
 .../zeppelin/geode/GeodeOqlInterpreterTest.java | 181 +++++++++++
 pom.xml                                         |   1 +
 .../zeppelin/conf/ZeppelinConfiguration.java    |   3 +-
 6 files changed, 655 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 13e4d1d..f48a960 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -72,7 +72,7 @@
 
 <property>
   <name>zeppelin.interpreters</name>
-  <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter</value>
+  <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter</value>
   <description>Comma separated interpreter configurations. First interpreter become a default</description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/geode/pom.xml
----------------------------------------------------------------------
diff --git a/geode/pom.xml b/geode/pom.xml
new file mode 100644
index 0000000..3fa7cb3
--- /dev/null
+++ b/geode/pom.xml
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>zeppelin</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.6.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>zeppelin-geode</artifactId>
+  <packaging>jar</packaging>
+  <version>0.6.0-incubating-SNAPSHOT</version>
+  <name>Zeppelin: Apache Geode interpreter</name>
+  <url>http://geode.incubator.apache.org/</url>
+
+  <properties>
+    <geode.version>1.0.0-incubating-SNAPSHOT</geode.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-interpreter</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.geode</groupId>
+      <artifactId>gemfire-core</artifactId>
+      <version>${geode.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-exec</artifactId>
+      <version>1.1</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.9.5</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <version>2.7</version>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.3.1</version>
+        <executions>
+          <execution>
+            <id>enforce</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.8</version>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/../../interpreter/geode</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>false</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+              <includeScope>runtime</includeScope>
+            </configuration>
+          </execution>
+          <execution>
+            <id>copy-artifact</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/../../interpreter/geode</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>false</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+              <includeScope>runtime</includeScope>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>${project.groupId}</groupId>
+                  <artifactId>${project.artifactId}</artifactId>
+                  <version>${project.version}</version>
+                  <type>${project.packaging}</type>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java
----------------------------------------------------------------------
diff --git a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java
new file mode 100644
index 0000000..6f6b440
--- /dev/null
+++ b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.geode;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.pdx.PdxInstance;
+
+/**
+ * Apache Geode OQL Interpreter (http://geode.incubator.apache.org)
+ * 
+ * <ul>
+ * <li>{@code geode.locator.host} - The Geode Locator {@code <HOST>} to connect to.</li>
+ * <li>{@code geode.locator.port} - The Geode Locator {@code <PORT>} to connect to.</li>
+ * <li>{@code geode.max.result} - Max number of OQL result to display.</li>
+ * </ul>
+ * <p>
+ * Sample usages: <br/>
+ * {@code %geode.oql} <br/>
+ * {@code SELECT * FROM /regionEmployee e WHERE e.companyId > 95} <br/>
+ * {@code SELECT * FROM /regionEmployee ORDER BY employeeId} <br/>
+ * {@code 
+ * SELECT * FROM /regionEmployee 
+ * WHERE companyId IN SET(1, 3, 7) OR lastName IN SET('NameA', 'NameB')
+ * } <br/>
+ * {@code 
+ * SELECT e.employeeId, c.id as companyId FROM /regionEmployee e, /regionCompany c 
+ * WHERE e.companyId = c.id
+ * }
+ * </p>
+ * <p>
+ * OQL specification and sample queries:
+ * http://geode-docs.cfapps.io/docs/getting_started/querying_quick_reference.html
+ * </p>
+ * <p>
+ * When the Zeppelin server is collocated with Geode Shell (gfsh) one can use the %sh interpreter to
+ * run Geode shell commands: <br/>
+ * {@code 
+ * %sh
+ *  source /etc/geode/conf/geode-env.sh
+ *  gfsh << EOF
+ *    connect --locator=ambari.localdomain[10334]
+ *    destroy region --name=/regionEmployee
+ *    create region --name=regionEmployee --type=REPLICATE
+ *    exit;
+ *  EOF
+ *}
+ * </p>
+ * <p>
+ * Known issue:http://gemfire.docs.pivotal.io/bugnotes/KnownIssuesGemFire810.html #43673 Using query
+ * "select * from /exampleRegion.entrySet" fails in a client-server topology and/or in a
+ * PartitionedRegion.
+ * </p>
+ */
+public class GeodeOqlInterpreter extends Interpreter {
+
+  private Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class);
+
+  public static final String DEFAULT_PORT = "10334";
+  public static final String DEFAULT_HOST = "localhost";
+  public static final String DEFAULT_MAX_RESULT = "1000";
+
+  private static final char NEWLINE = '\n';
+  private static final char TAB = '\t';
+  private static final char WHITESPACE = ' ';
+
+  private static final String TABLE_MAGIC_TAG = "%table ";
+
+  public static final String LOCATOR_HOST = "geode.locator.host";
+  public static final String LOCATOR_PORT = "geode.locator.port";
+  public static final String MAX_RESULT = "geode.max.result";
+
+  static {
+    Interpreter.register(
+        "oql",
+        "geode",
+        GeodeOqlInterpreter.class.getName(),
+        new InterpreterPropertyBuilder().add(LOCATOR_HOST, DEFAULT_HOST, "The Geode Locator Host.")
+            .add(LOCATOR_PORT, DEFAULT_PORT, "The Geode Locator Port")
+            .add(MAX_RESULT, DEFAULT_MAX_RESULT, "Max number of OQL result to display.").build());
+  }
+
+  private ClientCache clientCache = null;
+  private QueryService queryService = null;
+  private Exception exceptionOnConnect;
+  private int maxResult;
+
+  public GeodeOqlInterpreter(Properties property) {
+    super(property);
+  }
+
+  protected ClientCache getClientCache() {
+
+    String locatorHost = getProperty(LOCATOR_HOST);
+    int locatorPort = Integer.valueOf(getProperty(LOCATOR_PORT));
+
+    ClientCache clientCache =
+        new ClientCacheFactory().addPoolLocator(locatorHost, locatorPort).create();
+
+    return clientCache;
+  }
+
+  @Override
+  public void open() {
+    logger.info("Geode open connection called!");
+
+    // Close the previous open connections.
+    close();
+
+    try {
+      maxResult = Integer.valueOf(getProperty(MAX_RESULT));
+
+      clientCache = getClientCache();
+      queryService = clientCache.getQueryService();
+
+      exceptionOnConnect = null;
+      logger.info("Successfully created Geode connection");
+    } catch (Exception e) {
+      logger.error("Cannot open connection", e);
+      exceptionOnConnect = e;
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (clientCache != null) {
+        clientCache.close();
+      }
+
+      if (queryService != null) {
+        queryService.closeCqs();
+      }
+
+    } catch (Exception e) {
+      logger.error("Cannot close connection", e);
+    } finally {
+      clientCache = null;
+      queryService = null;
+      exceptionOnConnect = null;
+    }
+  }
+
+  private InterpreterResult executeOql(String oql) {
+    try {
+
+      if (getExceptionOnConnect() != null) {
+        return new InterpreterResult(Code.ERROR, getExceptionOnConnect().getMessage());
+      }
+
+      @SuppressWarnings("unchecked")
+      SelectResults<Object> results =
+          (SelectResults<Object>) getQueryService().newQuery(oql).execute();
+
+      StringBuilder msg = new StringBuilder(TABLE_MAGIC_TAG);
+      boolean isTableHeaderSet = false;
+
+      Iterator<Object> iterator = results.iterator();
+      int rowDisplayCount = 0;
+
+      while (iterator.hasNext() && (rowDisplayCount < getMaxResult())) {
+
+        Object entry = iterator.next();
+        rowDisplayCount++;
+
+        if (entry instanceof Number) {
+          handleNumberEntry(isTableHeaderSet, entry, msg);
+        } else if (entry instanceof Struct) {
+          handleStructEntry(isTableHeaderSet, entry, msg);
+        } else if (entry instanceof PdxInstance) {
+          handlePdxInstanceEntry(isTableHeaderSet, entry, msg);
+        } else {
+          handleUnsupportedTypeEntry(isTableHeaderSet, entry, msg);
+        }
+
+        isTableHeaderSet = true;
+        msg.append(NEWLINE);
+      }
+
+      return new InterpreterResult(Code.SUCCESS, msg.toString());
+
+    } catch (Exception ex) {
+      logger.error("Cannot run " + oql, ex);
+      return new InterpreterResult(Code.ERROR, ex.getMessage());
+    }
+  }
+
+  /**
+   * Zeppelin's %TABLE convention uses tab (\t) to delimit fields and new-line (\n) to delimit rows
+   * To complain with this convention we need to replace any occurrences of tab and/or newline
+   * characters in the content.
+   */
+  private String replaceReservedChars(String str) {
+
+    if (StringUtils.isBlank(str)) {
+      return str;
+    }
+
+    return str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE);
+  }
+
+  private void handleStructEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
+    Struct struct = (Struct) entry;
+    if (!isHeaderSet) {
+      for (String titleName : struct.getStructType().getFieldNames()) {
+        msg.append(replaceReservedChars(titleName)).append(TAB);
+      }
+      msg.append(NEWLINE);
+    }
+
+    for (String titleName : struct.getStructType().getFieldNames()) {
+      msg.append(replaceReservedChars("" + struct.get(titleName))).append(TAB);
+    }
+  }
+
+  private void handlePdxInstanceEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
+    PdxInstance pdxEntry = (PdxInstance) entry;
+    if (!isHeaderSet) {
+      for (String titleName : pdxEntry.getFieldNames()) {
+        msg.append(replaceReservedChars(titleName)).append(TAB);
+      }
+      msg.append(NEWLINE);
+    }
+
+    for (String titleName : pdxEntry.getFieldNames()) {
+      msg.append(replaceReservedChars("" + pdxEntry.getField(titleName))).append(TAB);
+    }
+  }
+
+  private void handleNumberEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
+    if (!isHeaderSet) {
+      msg.append("Result").append(NEWLINE);
+    }
+    msg.append((Number) entry);
+  }
+
+  private void handleUnsupportedTypeEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
+    if (!isHeaderSet) {
+      msg.append("Unsuppoted Type").append(NEWLINE);
+    }
+    msg.append("" + entry);
+  }
+
+
+  @Override
+  public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
+    logger.info("Run OQL command '{}'", cmd);
+    return executeOql(cmd);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+    // Do nothing
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.SIMPLE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+        GeodeOqlInterpreter.class.getName() + this.hashCode());
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    return null;
+  }
+
+  public int getMaxResult() {
+    return maxResult;
+  }
+
+  // Test only
+  QueryService getQueryService() {
+    return this.queryService;
+  }
+
+  Exception getExceptionOnConnect() {
+    return this.exceptionOnConnect;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java
new file mode 100644
index 0000000..78755eb
--- /dev/null
+++ b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.geode;
+
+import static org.apache.zeppelin.geode.GeodeOqlInterpreter.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter.FormType;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.junit.Test;
+
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.cache.query.internal.StructImpl;
+import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
+import com.gemstone.gemfire.pdx.PdxInstance;
+import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl;
+import com.gemstone.gemfire.pdx.internal.PdxType;
+
+public class GeodeOqlInterpreterTest {
+
+  private static final String OQL_QUERY = "select * from /region";
+
+  private static Iterator<Object> asIterator(Object... items) {
+    return new ArrayList<Object>(Arrays.asList(items)).iterator();
+  }
+
+  @Test
+  public void testOpenCommandIndempotency() {
+
+    Properties properties = new Properties();
+    properties.put(LOCATOR_HOST, DEFAULT_HOST);
+    properties.put(LOCATOR_PORT, DEFAULT_PORT);
+    properties.put(MAX_RESULT, DEFAULT_MAX_RESULT);
+
+    GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(properties));
+
+    // Ensure that an attempt to open new connection will clean any remaining connections
+    spyGeodeOqlInterpreter.open();
+    spyGeodeOqlInterpreter.open();
+    spyGeodeOqlInterpreter.open();
+
+    verify(spyGeodeOqlInterpreter, times(3)).open();
+    verify(spyGeodeOqlInterpreter, times(3)).close();
+  }
+
+  @Test
+  public void oqlNumberResponse() throws Exception {
+    testOql(asIterator(66, 67), "Result\n66\n67\n", 10);
+    testOql(asIterator(66, 67), "Result\n66\n", 1);
+  }
+
+  @Test
+  public void oqlStructResponse() throws Exception {
+    String[] fields = new String[] {"field1", "field2"};
+    Struct s1 = new StructImpl(new StructTypeImpl(fields), new String[] {"val11", "val12"});
+    Struct s2 = new StructImpl(new StructTypeImpl(fields), new String[] {"val21", "val22"});
+
+    testOql(asIterator(s1, s2), "field1\tfield2\t\nval11\tval12\t\nval21\tval22\t\n", 10);
+    testOql(asIterator(s1, s2), "field1\tfield2\t\nval11\tval12\t\n", 1);
+  }
+
+  @Test
+  public void oqlStructResponseWithReservedCharacters() throws Exception {
+    String[] fields = new String[] {"fi\teld1", "f\nield2"};
+    Struct s1 = new StructImpl(new StructTypeImpl(fields), new String[] {"v\nal\t1", "val2"});
+
+    testOql(asIterator(s1), "fi eld1\tf ield2\t\nv al 1\tval2\t\n", 10);
+  }
+
+  @Test
+  public void oqlPdxInstanceResponse() throws Exception {
+    ByteArrayInputStream bais = new ByteArrayInputStream("koza\tboza\n".getBytes());
+    PdxInstance pdx1 = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4);
+    PdxInstance pdx2 = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4);
+
+    testOql(asIterator(pdx1, pdx2), "\n\n\n", 10);
+    testOql(asIterator(pdx1, pdx2), "\n\n", 1);
+  }
+
+  private static class DummyUnspportedType {
+    @Override
+    public String toString() {
+      return "Unsupported Indeed";
+    }
+  }
+
+  @Test
+  public void oqlUnsupportedTypeResponse() throws Exception {
+    DummyUnspportedType unspported1 = new DummyUnspportedType();
+    DummyUnspportedType unspported2 = new DummyUnspportedType();
+
+    testOql(asIterator(unspported1, unspported2), "Unsuppoted Type\n" + unspported1.toString()
+        + "\n" + unspported1.toString() + "\n", 10);
+  }
+
+  private void testOql(Iterator<Object> queryResponseIterator, String expectedOutput, int maxResult)
+      throws Exception {
+
+    GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
+
+    QueryService mockQueryService = mock(QueryService.class, RETURNS_DEEP_STUBS);
+
+    when(spyGeodeOqlInterpreter.getQueryService()).thenReturn(mockQueryService);
+    when(spyGeodeOqlInterpreter.getMaxResult()).thenReturn(maxResult);
+
+    @SuppressWarnings("unchecked")
+    SelectResults<Object> mockResults = mock(SelectResults.class);
+
+    when(mockQueryService.newQuery(eq(OQL_QUERY)).execute()).thenReturn(mockResults);
+
+    when(mockResults.iterator()).thenReturn(queryResponseIterator);
+
+    InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null);
+
+    assertEquals(Code.SUCCESS, interpreterResult.code());
+    assertEquals(expectedOutput, interpreterResult.message());
+  }
+
+  @Test
+  public void oqlWithQueryException() throws Exception {
+
+    GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
+
+    when(spyGeodeOqlInterpreter.getExceptionOnConnect()).thenReturn(
+        new RuntimeException("Test Exception On Connect"));
+
+    InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null);
+
+    assertEquals(Code.ERROR, interpreterResult.code());
+    assertEquals("Test Exception On Connect", interpreterResult.message());
+  }
+
+  @Test
+  public void oqlWithExceptionOnConnect() throws Exception {
+
+    GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
+
+    when(spyGeodeOqlInterpreter.getQueryService()).thenThrow(
+        new RuntimeException("Expected Test Exception!"));
+
+    InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null);
+
+    assertEquals(Code.ERROR, interpreterResult.code());
+    assertEquals("Expected Test Exception!", interpreterResult.message());
+  }
+
+  @Test
+  public void testFormType() {
+    assertEquals(FormType.SIMPLE, new GeodeOqlInterpreter(new Properties()).getFormType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9e5f54e..ecdebdd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
     <module>angular</module>
     <module>shell</module>
     <module>hive</module>
+    <module>geode</module>
     <module>tajo</module>
     <module>flink</module>
     <module>ignite</module>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f04425bc/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 223dc70..e25d3c0 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -394,7 +394,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
         + "org.apache.zeppelin.ignite.IgniteInterpreter,"
         + "org.apache.zeppelin.ignite.IgniteSqlInterpreter,"
         + "org.apache.zeppelin.lens.LensInterpreter,"
-        + "org.apache.zeppelin.cassandra.CassandraInterpreter"),
+        + "org.apache.zeppelin.cassandra.CassandraInterpreter,"
+        + "org.apache.zeppelin.geode.GeodeOqlInterpreter"),
     ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
     ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
     ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),