You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by fe...@apache.org on 2016/10/15 19:26:56 UTC

zeppelin git commit: ZEPPELIN-335. Pig Interpreter

Repository: zeppelin
Updated Branches:
  refs/heads/master 5d5d758ce -> 465c51a41


ZEPPELIN-335. Pig Interpreter

### What is this PR for?
Based on #338 , I refactor most of pig interpreter. As I don't think the approach in #338 is the best approach. In #338, we use script `bin/pig` to launch pig script, it is different to control that job (hard to kill and get progress and stats info).  In this PR, I use pig api to launch pig script. Besides that I implement another interpreter type `%pig.query` to leverage the display system of zeppelin. For the details you can check `pig.md`

### What type of PR is it?
[Feature]

### Todos
* Syntax Highlight
* new interpreter type `%pig.udf`, so that user can write pig udf in zeppelin directly and don't need to build udf jar manually.

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-335

### How should this be tested?
Unit test is added and also manual test is done

### Screenshots (if appropriate)

![image](https://cloud.githubusercontent.com/assets/164491/18986649/54217b4c-8730-11e6-9e33-25f98a98a9b6.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>
Author: Ali Bajwa <ab...@hortonworks.com>
Author: AhyoungRyu <ah...@apache.org>
Author: Jeff Zhang <zj...@gmail.com>

Closes #1476 from zjffdu/ZEPPELIN-335 and squashes the following commits:

73a07f0 [Jeff Zhang] minor update
a1b742b [Jeff Zhang] minor update on doc
e858301 [Jeff Zhang] address comments
c85a090 [Jeff Zhang] add license
58b4b2f [Jeff Zhang] minor update of docs
1ae7db2 [Jeff Zhang] Merge pull request #2 from AhyoungRyu/ZEPPELIN-335/docs
fe014a7 [AhyoungRyu] Fix docs title in front matter
df7a6db [AhyoungRyu] Add pig.md to dropdown menu
5e2e222 [AhyoungRyu] Minor update for pig.md
39f161a [Jeff Zhang] address comments
05a3b9b [Jeff Zhang] add pig.md
a09a7f7 [Jeff Zhang] refactor pig Interpreter
c28beb5 [Ali Bajwa] Updated based on comments: 1. Documentation: added pig.md with interpreter documentation and added pig entry to index.md 2. Added test junit test based on passwd file parsing example here https://pig.apache.org/docs/r0.10.0/start.html#run 3. Removed author tag from comment (this was copied from shell interpreter https://github.com/apache/incubator-zeppelin/blob/master/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java#L42) 4. Implemented cancel functionality 5. Display output stream in case of error
2586336 [Ali Bajwa] exposed timeout and pig executable via interpreter and added comments
7abad20 [Ali Bajwa] initial commit of pig interpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/465c51a4
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/465c51a4
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/465c51a4

Branch: refs/heads/master
Commit: 465c51a419347f89a7d73c8014b3b46f27f1d5b8
Parents: 5d5d758
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Oct 11 10:27:39 2016 +0800
Committer: Felix Cheung <fe...@apache.org>
Committed: Sat Oct 15 12:26:50 2016 -0700

----------------------------------------------------------------------
 bin/interpreter.sh                              |  22 ++
 conf/interpreter-list                           |   1 +
 conf/zeppelin-site.xml.template                 |   2 +-
 docs/_includes/themes/zeppelin/_navigation.html |   1 +
 docs/interpreter/pig.md                         |  97 ++++++
 pig/pom.xml                                     | 184 ++++++++++++
 .../apache/zeppelin/pig/BasePigInterpreter.java | 100 +++++++
 .../org/apache/zeppelin/pig/PigInterpreter.java | 137 +++++++++
 .../zeppelin/pig/PigQueryInterpreter.java       | 172 +++++++++++
 .../apache/zeppelin/pig/PigScriptListener.java  |  94 ++++++
 .../java/org/apache/zeppelin/pig/PigUtils.java  | 292 +++++++++++++++++++
 pig/src/main/resources/interpreter-setting.json |  46 +++
 .../apache/zeppelin/pig/PigInterpreterTest.java | 155 ++++++++++
 .../zeppelin/pig/PigQueryInterpreterTest.java   | 153 ++++++++++
 pig/src/test/resources/log4j.properties         |  22 ++
 pom.xml                                         |   1 +
 zeppelin-distribution/src/bin_license/LICENSE   |  10 +-
 .../zeppelin/conf/ZeppelinConfiguration.java    |   4 +-
 18 files changed, 1490 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index a81c8f2..4fb4b26 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -149,6 +149,28 @@ elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then
   else
     echo "HBASE_HOME and HBASE_CONF_DIR are not set, configuration might not be loaded"
   fi
+elif [[ "${INTERPRETER_ID}" == "pig" ]]; then
+   # autodetect HADOOP_CONF_HOME by heuristic
+  if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
+    if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
+      export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
+    elif [[ -d "/etc/hadoop/conf" ]]; then
+      export HADOOP_CONF_DIR="/etc/hadoop/conf"
+    fi
+  fi
+
+  if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
+    ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
+  fi
+  
+  # autodetect TEZ_CONF_DIR
+  if [[ -n "${TEZ_CONF_DIR}" ]]; then
+    ZEPPELIN_INTP_CLASSPATH+=":${TEZ_CONF_DIR}"
+  elif [[ -d "/etc/tez/conf" ]]; then
+    ZEPPELIN_INTP_CLASSPATH+=":/etc/tez/conf"
+  else
+    echo "TEZ_CONF_DIR is not set, configuration might not be loaded"
+  fi
 fi
 
 addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}"

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/conf/interpreter-list
----------------------------------------------------------------------
diff --git a/conf/interpreter-list b/conf/interpreter-list
index 098b3c6..38cb386 100644
--- a/conf/interpreter-list
+++ b/conf/interpreter-list
@@ -32,6 +32,7 @@ kylin           org.apache.zeppelin:zeppelin-kylin:0.6.1                Kylin in
 lens            org.apache.zeppelin:zeppelin-lens:0.6.1                 Lens interpreter
 livy            org.apache.zeppelin:zeppelin-livy:0.6.1                 Livy interpreter
 md              org.apache.zeppelin:zeppelin-markdown:0.6.1             Markdown support
+pig             org.apache.zeppelin:zeppelin-pig:0.6.1                  Pig interpreter
 postgresql      org.apache.zeppelin:zeppelin-postgresql:0.6.1           Postgresql interpreter
 python          org.apache.zeppelin:zeppelin-python:0.6.1               Python interpreter
 shell           org.apache.zeppelin:zeppelin-shell:0.6.1                Shell command

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 05bd719..c4b369c 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -190,7 +190,7 @@
 
 <property>
   <name>zeppelin.interpreters</name>
-  <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,or
 g.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter</value>
+  <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,or
 g.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter, org.apache.zeppelin.pig.PigQueryInterpreter</value>
   <description>Comma separated interpreter configurations. First interpreter become a default</description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/docs/_includes/themes/zeppelin/_navigation.html
----------------------------------------------------------------------
diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html
index a0e4485..9abcdb1 100644
--- a/docs/_includes/themes/zeppelin/_navigation.html
+++ b/docs/_includes/themes/zeppelin/_navigation.html
@@ -62,6 +62,7 @@
                 <li><a href="{{BASE_PATH}}/interpreter/lens.html">Lens</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/livy.html">Livy</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/markdown.html">Markdown</a></li>
+                <li><a href="{{BASE_PATH}}/interpreter/pig.html">Pig</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/python.html">Python</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/postgresql.html">Postgresql, HAWQ</a></li>
                 <li><a href="{{BASE_PATH}}/interpreter/r.html">R</a></li>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/docs/interpreter/pig.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md
new file mode 100644
index 0000000..227656b
--- /dev/null
+++ b/docs/interpreter/pig.md
@@ -0,0 +1,97 @@
+---
+layout: page
+title: "Pig Interpreter for Apache Zeppelin"
+description: "Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs."
+group: manual
+---
+{% include JB/setup %}
+
+
+# Pig Interpreter for Apache Zeppelin
+
+<div id="toc"></div>
+
+## Overview
+[Apache Pig](https://pig.apache.org/) is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.
+
+## Supported interpreter type
+  - `%pig.script` (default)
+    
+    All the pig script can run in this type of interpreter, and display type is plain text.
+  
+  - `%pig.query`
+ 
+    Almost the same as `%pig.script`. The only difference is that you don't need to add alias in the last statement. And the display type is table.   
+
+## Supported runtime mode
+  - Local
+  - MapReduce
+  - Tez  (Only Tez 0.7 is supported)
+
+## How to use
+
+### How to setup Pig
+
+- Local Mode
+
+    Nothing needs to be done for local mode
+
+- MapReduce Mode
+
+    HADOOP\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`.
+
+- Tez Mode
+
+    HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`.
+
+### How to configure interpreter
+
+At the Interpreters menu, you have to create a new Pig interpreter. Pig interpreter has below properties by default.
+
+<table class="table-configuration">
+    <tr>
+        <th>Property</th>
+        <th>Default</th>
+        <th>Description</th>
+    </tr>
+    <tr>
+        <td>zeppelin.pig.execType</td>
+        <td>mapreduce</td>
+        <td>Execution mode for pig runtime. local | mapreduce | tez </td>
+    </tr>
+    <tr>
+        <td>zeppelin.pig.includeJobStats</td>
+        <td>false</td>
+        <td>whether display jobStats info in <code>%pig.script</code></td>
+    </tr>
+    <tr>
+        <td>zeppelin.pig.maxResult</td>
+        <td>1000</td>
+        <td>max row number displayed in <code>%pig.query</code></td>
+    </tr>
+</table>  
+
+### Example
+
+##### pig
+
+```
+%pig
+
+raw_data = load 'dataset/sf_crime/train.csv' using PigStorage(',') as (Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,X,Y);
+b = group raw_data all;
+c = foreach b generate COUNT($1);
+dump c;
+```
+
+##### pig.query
+
+```
+%pig.query
+
+b = foreach raw_data generate Category;
+c = group b by Category;
+foreach c generate group as category, COUNT($1) as count;
+```
+
+Data is shared between `%pig` and `%pig.query`, so that you can do some common work in `%pig`, and do different kinds of query based on the data of `%pig`.

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/pom.xml
----------------------------------------------------------------------
diff --git a/pig/pom.xml b/pig/pom.xml
new file mode 100644
index 0000000..a4e5cbb
--- /dev/null
+++ b/pig/pom.xml
@@ -0,0 +1,184 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>zeppelin</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.7.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.zeppelin</groupId>
+    <artifactId>zeppelin-pig</artifactId>
+    <packaging>jar</packaging>
+    <version>0.7.0-SNAPSHOT</version>
+    <name>Zeppelin: Apache Pig Interpreter</name>
+    <description>Zeppelin interpreter for Apache Pig</description>
+    <url>http://zeppelin.apache.org</url>
+
+    <properties>
+        <pig.version>0.16.0</pig.version>
+        <hadoop.version>2.6.0</hadoop.version>
+        <tez.version>0.7.0</tez.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>zeppelin-interpreter</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.pig</groupId>
+            <artifactId>pig</artifactId>
+            <classifier>h2</classifier>
+            <version>${pig.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-api</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-common</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-dag</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-runtime-library</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-runtime-internals</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-mapreduce</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-yarn-timeline-history-with-acls</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>1.3.1</version>
+                <executions>
+                    <execution>
+                        <id>enforce</id>
+                        <phase>none</phase>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.8</version>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${project.build.directory}/../../interpreter/pig
+                            </outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                            <includeScope>runtime</includeScope>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>copy-artifact</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${project.build.directory}/../../interpreter/pig
+                            </outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                            <includeScope>runtime</includeScope>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>${project.groupId}</groupId>
+                                    <artifactId>${project.artifactId}</artifactId>
+                                    <version>${project.version}</version>
+                                    <type>${project.packaging}</type>
+                                </artifactItem>
+                            </artifactItems>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java
new file mode 100644
index 0000000..0aa8a20
--- /dev/null
+++ b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.pig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ */
+public abstract class BasePigInterpreter extends Interpreter {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(BasePigInterpreter.class);
+
+  protected ConcurrentHashMap<String, PigScriptListener> listenerMap = new ConcurrentHashMap<>();
+
+  public BasePigInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+    LOGGER.info("Cancel paragraph:" + context.getParagraphId());
+    PigScriptListener listener = listenerMap.get(context.getParagraphId());
+    if (listener != null) {
+      Set<String> jobIds = listener.getJobIds();
+      if (jobIds.isEmpty()) {
+        LOGGER.info("No job is started, so can not cancel paragraph:" + context.getParagraphId());
+      }
+      for (String jobId : jobIds) {
+        LOGGER.info("Kill jobId:" + jobId);
+        HExecutionEngine engine =
+                (HExecutionEngine) getPigServer().getPigContext().getExecutionEngine();
+        try {
+          Field launcherField = HExecutionEngine.class.getDeclaredField("launcher");
+          launcherField.setAccessible(true);
+          Launcher launcher = (Launcher) launcherField.get(engine);
+          // It doesn't work for Tez Engine due to PIG-5035
+          launcher.killJob(jobId, new Configuration());
+        } catch (NoSuchFieldException | BackendException | IllegalAccessException e) {
+          LOGGER.error("Fail to cancel paragraph:" + context.getParagraphId(), e);
+        }
+      }
+    } else {
+      LOGGER.warn("No PigScriptListener found, can not cancel paragraph:"
+              + context.getParagraphId());
+    }
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.SIMPLE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    PigScriptListener listener = listenerMap.get(context.getParagraphId());
+    if (listener != null) {
+      return listener.getProgress();
+    }
+    return 0;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+            PigInterpreter.class.getName() + this.hashCode());
+  }
+
+  public abstract PigServer getPigServer();
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
new file mode 100644
index 0000000..8cd1efc
--- /dev/null
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.pig;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.tools.pigstats.*;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.*;
+
+/**
+ * Pig interpreter for Zeppelin.
+ */
+public class PigInterpreter extends BasePigInterpreter {
+  private static Logger LOGGER = LoggerFactory.getLogger(PigInterpreter.class);
+
+  private PigServer pigServer;
+  private boolean includeJobStats = false;
+
+  public PigInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+    String execType = getProperty("zeppelin.pig.execType");
+    if (execType == null) {
+      execType = "mapreduce";
+    }
+    String includeJobStats = getProperty("zeppelin.pig.includeJobStats");
+    if (includeJobStats != null) {
+      this.includeJobStats = Boolean.parseBoolean(includeJobStats);
+    }
+    try {
+      pigServer = new PigServer(execType);
+    } catch (IOException e) {
+      LOGGER.error("Fail to initialize PigServer", e);
+      throw new RuntimeException("Fail to initialize PigServer", e);
+    }
+  }
+
+  @Override
+  public void close() {
+    pigServer = null;
+  }
+
+
+  @Override
+  public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
+    // remember the origial stdout, because we will redirect stdout to capture
+    // the pig dump output.
+    PrintStream originalStdOut = System.out;
+    ByteArrayOutputStream bytesOutput = new ByteArrayOutputStream();
+    File tmpFile = null;
+    try {
+      tmpFile = PigUtils.createTempPigScript(cmd);
+      System.setOut(new PrintStream(bytesOutput));
+      // each thread should its own ScriptState & PigStats
+      ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState());
+      // reset PigStats, otherwise you may get the PigStats of last job in the same thread
+      // because PigStats is ThreadLocal variable
+      PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats());
+      PigScriptListener scriptListener = new PigScriptListener();
+      ScriptState.get().registerListener(scriptListener);
+      listenerMap.put(contextInterpreter.getParagraphId(), scriptListener);
+      pigServer.registerScript(tmpFile.getAbsolutePath());
+    } catch (IOException e) {
+      if (e instanceof FrontendException) {
+        FrontendException fe = (FrontendException) e;
+        if (!fe.getMessage().contains("Backend error :")) {
+          // If the error message contains "Backend error :", that means the exception is from
+          // backend.
+          LOGGER.error("Fail to run pig script.", e);
+          return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
+        }
+      }
+      PigStats stats = PigStats.get();
+      if (stats != null) {
+        String errorMsg = PigUtils.extactJobStats(stats);
+        if (errorMsg != null) {
+          LOGGER.error("Fail to run pig script, " + errorMsg);
+          return new InterpreterResult(Code.ERROR, errorMsg);
+        }
+      }
+      LOGGER.error("Fail to run pig script.", e);
+      return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
+    } finally {
+      System.setOut(originalStdOut);
+      listenerMap.remove(contextInterpreter.getParagraphId());
+      if (tmpFile != null) {
+        tmpFile.delete();
+      }
+    }
+    StringBuilder outputBuilder = new StringBuilder();
+    PigStats stats = PigStats.get();
+    if (stats != null && includeJobStats) {
+      String jobStats = PigUtils.extactJobStats(stats);
+      if (jobStats != null) {
+        outputBuilder.append(jobStats);
+      }
+    }
+    outputBuilder.append(bytesOutput.toString());
+    return new InterpreterResult(Code.SUCCESS, outputBuilder.toString());
+  }
+
+
+  public PigServer getPigServer() {
+    return pigServer;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
new file mode 100644
index 0000000..c763b7f
--- /dev/null
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.pig;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ *
+ */
+public class PigQueryInterpreter extends BasePigInterpreter {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(PigQueryInterpreter.class);
+  private PigServer pigServer;
+  private int maxResult;
+
+  public PigQueryInterpreter(Properties properties) {
+    super(properties);
+  }
+
+  @Override
+  public void open() {
+    pigServer = getPigInterpreter().getPigServer();
+    maxResult = Integer.parseInt(getProperty("zeppelin.pig.maxResult"));
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    // '-' is invalid for pig alias
+    String alias = "paragraph_" + context.getParagraphId().replace("-", "_");
+    String[] lines = st.split("\n");
+    List<String> queries = new ArrayList<String>();
+    for (int i = 0; i < lines.length; ++i) {
+      if (i == lines.length - 1) {
+        lines[i] = alias + " = " + lines[i];
+      }
+      queries.add(lines[i]);
+    }
+
+    StringBuilder resultBuilder = new StringBuilder("%table ");
+    try {
+      File tmpScriptFile = PigUtils.createTempPigScript(queries);
+      // each thread should its own ScriptState & PigStats
+      ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState());
+      // reset PigStats, otherwise you may get the PigStats of last job in the same thread
+      // because PigStats is ThreadLocal variable
+      PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats());
+      PigScriptListener scriptListener = new PigScriptListener();
+      ScriptState.get().registerListener(scriptListener);
+      listenerMap.put(context.getParagraphId(), scriptListener);
+      pigServer.registerScript(tmpScriptFile.getAbsolutePath());
+      Schema schema = pigServer.dumpSchema(alias);
+      boolean schemaKnown = (schema != null);
+      if (schemaKnown) {
+        for (int i = 0; i < schema.size(); ++i) {
+          Schema.FieldSchema field = schema.getField(i);
+          resultBuilder.append(field.alias);
+          if (i != schema.size() - 1) {
+            resultBuilder.append("\t");
+          }
+        }
+        resultBuilder.append("\n");
+      }
+      Iterator<Tuple> iter = pigServer.openIterator(alias);
+      boolean firstRow = true;
+      int index = 0;
+      while (iter.hasNext() && index <= maxResult) {
+        index++;
+        Tuple tuple = iter.next();
+        if (firstRow && !schemaKnown) {
+          for (int i = 0; i < tuple.size(); ++i) {
+            resultBuilder.append("c_" + i + "\t");
+          }
+          resultBuilder.append("\n");
+          firstRow = false;
+        }
+        resultBuilder.append(StringUtils.join(tuple, "\t"));
+        resultBuilder.append("\n");
+      }
+      if (index >= maxResult && iter.hasNext()) {
+        resultBuilder.append("\n<font color=red>Results are limited by " + maxResult + ".</font>");
+      }
+    } catch (IOException e) {
+      // Extract error in the following order
+      // 1. catch FrontendException, FrontendException happens in the query compilation phase.
+      // 2. PigStats, This is execution error
+      // 3. Other errors.
+      if (e instanceof FrontendException) {
+        FrontendException fe = (FrontendException) e;
+        if (!fe.getMessage().contains("Backend error :")) {
+          LOGGER.error("Fail to run pig query.", e);
+          return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
+        }
+      }
+      PigStats stats = PigStats.get();
+      if (stats != null) {
+        String errorMsg = PigUtils.extactJobStats(stats);
+        if (errorMsg != null) {
+          return new InterpreterResult(Code.ERROR, errorMsg);
+        }
+      }
+      LOGGER.error("Fail to run pig query.", e);
+      return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
+    } finally {
+      listenerMap.remove(context.getParagraphId());
+    }
+    return new InterpreterResult(Code.SUCCESS, resultBuilder.toString());
+  }
+
+  @Override
+  public PigServer getPigServer() {
+    return this.pigServer;
+  }
+
+  private PigInterpreter getPigInterpreter() {
+    LazyOpenInterpreter lazy = null;
+    PigInterpreter pig = null;
+    Interpreter p = getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName());
+
+    while (p instanceof WrappedInterpreter) {
+      if (p instanceof LazyOpenInterpreter) {
+        lazy = (LazyOpenInterpreter) p;
+      }
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
+    }
+    pig = (PigInterpreter) p;
+
+    if (lazy != null) {
+      lazy.open();
+    }
+    return pig;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java b/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java
new file mode 100644
index 0000000..1f88b2e
--- /dev/null
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigScriptListener.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.pig;
+
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ */
+public class PigScriptListener implements PigProgressNotificationListener {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(PigScriptListener.class);
+
+  private Set<String> jobIds = new HashSet();
+  private int progress;
+
+  @Override
+  public void initialPlanNotification(String scriptId, OperatorPlan<?> plan) {
+
+  }
+
+  @Override
+  public void launchStartedNotification(String scriptId, int numJobsToLaunch) {
+
+  }
+
+  @Override
+  public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) {
+
+  }
+
+  @Override
+  public void jobStartedNotification(String scriptId, String assignedJobId) {
+    this.jobIds.add(assignedJobId);
+  }
+
+  @Override
+  public void jobFinishedNotification(String scriptId, JobStats jobStats) {
+
+  }
+
+  @Override
+  public void jobFailedNotification(String scriptId, JobStats jobStats) {
+
+  }
+
+  @Override
+  public void outputCompletedNotification(String scriptId, OutputStats outputStats) {
+
+  }
+
+  @Override
+  public void progressUpdatedNotification(String scriptId, int progress) {
+    LOGGER.debug("scriptId:" + scriptId + ", progress:" + progress);
+    this.progress = progress;
+  }
+
+  @Override
+  public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
+
+  }
+
+  public Set<String> getJobIds() {
+    return jobIds;
+  }
+
+  public int getProgress() {
+    return progress;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
----------------------------------------------------------------------
diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
new file mode 100644
index 0000000..d444e02
--- /dev/null
+++ b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.pig;
+
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pig.PigRunner;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
+import org.apache.pig.tools.pigstats.InputStats;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
+import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
+import org.apache.pig.tools.pigstats.tez.TezDAGStats;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class PigUtils {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(PigUtils.class);
+
+  protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+  public static File createTempPigScript(String content) throws IOException {
+    File tmpFile = File.createTempFile("zeppelin", "pig");
+    LOGGER.debug("Create pig script file:" + tmpFile.getAbsolutePath());
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+    return tmpFile.getAbsoluteFile();
+  }
+
+  public static File createTempPigScript(List<String> lines) throws IOException {
+    return createTempPigScript(StringUtils.join(lines, "\n"));
+  }
+
+  public static String extactJobStats(PigStats stats) {
+    if (stats instanceof SimplePigStats) {
+      return extractFromSimplePigStats((SimplePigStats) stats);
+    } else if (stats instanceof TezPigScriptStats) {
+      return extractFromTezPigStats((TezPigScriptStats) stats);
+    } else {
+      throw new RuntimeException("Unrecognized stats type:" + stats.getClass().getSimpleName());
+    }
+  }
+
+  public static String extractFromSimplePigStats(SimplePigStats stats) {
+
+    try {
+      Field userIdField = PigStats.class.getDeclaredField("userId");
+      userIdField.setAccessible(true);
+      String userId = (String) (userIdField.get(stats));
+      Field startTimeField = PigStats.class.getDeclaredField("startTime");
+      startTimeField.setAccessible(true);
+      long startTime = (Long) (startTimeField.get(stats));
+      Field endTimeField = PigStats.class.getDeclaredField("endTime");
+      endTimeField.setAccessible(true);
+      long endTime = (Long) (endTimeField.get(stats));
+
+      if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) {
+        LOGGER.warn("unknown return code, can't display the results");
+        return null;
+      }
+      if (stats.getPigContext() == null) {
+        LOGGER.warn("unknown exec type, don't display the results");
+        return null;
+      }
+
+      SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+      StringBuilder sb = new StringBuilder();
+      sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
+      sb.append(stats.getHadoopVersion()).append("\t").append(stats.getPigVersion()).append("\t")
+              .append(userId).append("\t")
+              .append(sdf.format(new Date(startTime))).append("\t")
+              .append(sdf.format(new Date(endTime))).append("\t")
+              .append(stats.getFeatures()).append("\n");
+      sb.append("\n");
+      if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
+        sb.append("Success!\n");
+      } else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
+        sb.append("Some jobs have failed! Stop running all dependent jobs\n");
+      } else {
+        sb.append("Failed!\n");
+      }
+      sb.append("\n");
+
+      Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
+      jobPlanField.setAccessible(true);
+      PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stats);
+
+      if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS
+              || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
+        sb.append("Job Stats (time in seconds):\n");
+        sb.append(MRJobStats.SUCCESS_HEADER).append("\n");
+        List<JobStats> arr = jobPlan.getSuccessfulJobs();
+        for (JobStats js : arr) {
+          sb.append(js.getDisplayString());
+        }
+        sb.append("\n");
+      }
+      if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
+              || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
+        sb.append("Failed Jobs:\n");
+        sb.append(MRJobStats.FAILURE_HEADER).append("\n");
+        List<JobStats> arr = jobPlan.getFailedJobs();
+        for (JobStats js : arr) {
+          sb.append(js.getDisplayString());
+        }
+        sb.append("\n");
+      }
+      sb.append("Input(s):\n");
+      for (InputStats is : stats.getInputStats()) {
+        sb.append(is.getDisplayString());
+      }
+      sb.append("\n");
+      sb.append("Output(s):\n");
+      for (OutputStats ds : stats.getOutputStats()) {
+        sb.append(ds.getDisplayString());
+      }
+
+      sb.append("\nCounters:\n");
+      sb.append("Total records written : " + stats.getRecordWritten()).append("\n");
+      sb.append("Total bytes written : " + stats.getBytesWritten()).append("\n");
+      sb.append("Spillable Memory Manager spill count : "
+              + stats.getSMMSpillCount()).append("\n");
+      sb.append("Total bags proactively spilled: "
+              + stats.getProactiveSpillCountObjects()).append("\n");
+      sb.append("Total records proactively spilled: "
+              + stats.getProactiveSpillCountRecords()).append("\n");
+      sb.append("\nJob DAG:\n").append(jobPlan.toString());
+
+      return "Script Statistics: \n" + sb.toString();
+    } catch (Exception e) {
+      LOGGER.error("Can not extract message from SimplePigStats", e);
+      return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e);
+    }
+  }
+
+  private static String extractFromTezPigStats(TezPigScriptStats stats) {
+
+    try {
+      Field userIdField = PigStats.class.getDeclaredField("userId");
+      userIdField.setAccessible(true);
+      String userId = (String) (userIdField.get(stats));
+      Field startTimeField = PigStats.class.getDeclaredField("startTime");
+      startTimeField.setAccessible(true);
+      long startTime = (Long) (startTimeField.get(stats));
+      Field endTimeField = PigStats.class.getDeclaredField("endTime");
+      endTimeField.setAccessible(true);
+      long endTime = (Long) (endTimeField.get(stats));
+
+      SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+      StringBuilder sb = new StringBuilder();
+      sb.append("\n");
+      sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", stats.getHadoopVersion()));
+      sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", stats.getPigVersion()));
+      sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion()));
+      sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId));
+      sb.append(String.format("%1$20s: %2$-100s%n", "FileName", stats.getFileName()));
+      sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime))));
+      sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime))));
+      sb.append(String.format("%1$20s: %2$-100s%n", "Features", stats.getFeatures()));
+      sb.append("\n");
+      if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
+        sb.append("Success!\n");
+      } else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
+        sb.append("Some tasks have failed! Stop running all dependent tasks\n");
+      } else {
+        sb.append("Failed!\n");
+      }
+      sb.append("\n");
+
+      // Print diagnostic info in case of failure
+      if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
+              || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
+        if (stats.getErrorMessage() != null) {
+          String[] lines = stats.getErrorMessage().split("\n");
+          for (int i = 0; i < lines.length; i++) {
+            String s = lines[i].trim();
+            if (i == 0 || !org.apache.commons.lang.StringUtils.isEmpty(s)) {
+              sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
+            }
+          }
+          sb.append("\n");
+        }
+      }
+
+      Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
+      tezDAGStatsMapField.setAccessible(true);
+      Map<String, TezDAGStats> tezDAGStatsMap =
+              (Map<String, TezDAGStats>) tezDAGStatsMapField.get(stats);
+      int count = 0;
+      for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+        sb.append("\n");
+        sb.append("DAG " + count++ + ":\n");
+        sb.append(dagStats.getDisplayString());
+        sb.append("\n");
+      }
+
+      sb.append("Input(s):\n");
+      for (InputStats is : stats.getInputStats()) {
+        sb.append(is.getDisplayString().trim()).append("\n");
+      }
+      sb.append("\n");
+      sb.append("Output(s):\n");
+      for (OutputStats os : stats.getOutputStats()) {
+        sb.append(os.getDisplayString().trim()).append("\n");
+      }
+      return "Script Statistics:\n" + sb.toString();
+    } catch (Exception e) {
+      LOGGER.error("Can not extract message from SimplePigStats", e);
+      return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e);
+    }
+  }
+
+  public static List<String> extractJobIds(PigStats stat) {
+    if (stat instanceof SimplePigStats) {
+      return extractJobIdsFromSimplePigStats((SimplePigStats) stat);
+    } else if (stat instanceof TezPigScriptStats) {
+      return extractJobIdsFromTezPigStats((TezPigScriptStats) stat);
+    } else {
+      throw new RuntimeException("Unrecognized stats type:" + stat.getClass().getSimpleName());
+    }
+  }
+
+  public static List<String> extractJobIdsFromSimplePigStats(SimplePigStats stat) {
+    List<String> jobIds = new ArrayList<>();
+    try {
+      Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
+      jobPlanField.setAccessible(true);
+      PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stat);
+      List<JobStats> arr = jobPlan.getJobList();
+      for (JobStats js : arr) {
+        jobIds.add(js.getJobId());
+      }
+      return jobIds;
+    } catch (Exception e) {
+      LOGGER.error("Can not extract jobIds from SimpelPigStats", e);
+      throw new RuntimeException("Can not extract jobIds from SimpelPigStats", e);
+    }
+  }
+
+  public static List<String> extractJobIdsFromTezPigStats(TezPigScriptStats stat) {
+    List<String> jobIds = new ArrayList<>();
+    try {
+      Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
+      tezDAGStatsMapField.setAccessible(true);
+      Map<String, TezDAGStats> tezDAGStatsMap =
+              (Map<String, TezDAGStats>) tezDAGStatsMapField.get(stat);
+      for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+        LOGGER.debug("Tez JobId:" + dagStats.getJobId());
+        jobIds.add(dagStats.getJobId());
+      }
+      return jobIds;
+    } catch (Exception e) {
+      LOGGER.error("Can not extract jobIds from TezPigScriptStats", e);
+      throw new RuntimeException("Can not extract jobIds from TezPigScriptStats", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/pig/src/main/resources/interpreter-setting.json b/pig/src/main/resources/interpreter-setting.json
new file mode 100644
index 0000000..27918ed
--- /dev/null
+++ b/pig/src/main/resources/interpreter-setting.json
@@ -0,0 +1,46 @@
+[
+  {
+    "group": "pig",
+    "name": "script",
+    "className": "org.apache.zeppelin.pig.PigInterpreter",
+    "properties": {
+      "zeppelin.pig.execType": {
+        "envName": null,
+        "propertyName": "zeppelin.pig.execType",
+        "defaultValue": "mapreduce",
+        "description": "local | mapreduce | tez"
+      },
+      "zeppelin.pig.includeJobStats": {
+        "envName": null,
+        "propertyName": "zeppelin.pig.includeJobStats",
+        "defaultValue": "false",
+        "description": "flag to include job stats in output"
+      }
+    },
+    "editor": {
+      "language": "pig"
+    }
+  },
+  {
+    "group": "pig",
+    "name": "query",
+    "className": "org.apache.zeppelin.pig.PigQueryInterpreter",
+    "properties": {
+      "zeppelin.pig.execType": {
+        "envName": null,
+        "propertyName": "zeppelin.pig.execType",
+        "defaultValue": "mapreduce",
+        "description": "local | mapreduce | tez"
+      },
+      "zeppelin.pig.maxResult": {
+        "envName": null,
+        "propertyName": "zeppelin.pig.maxResult",
+        "defaultValue": "1000",
+        "description": "max row number for %pig.query"
+      }
+    },
+    "editor": {
+      "language": "pig"
+    }
+  }
+]

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
----------------------------------------------------------------------
diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
new file mode 100644
index 0000000..3d062d6
--- /dev/null
+++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.pig;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PigInterpreterTest {
+
+  private PigInterpreter pigInterpreter;
+  private InterpreterContext context;
+
+  @Before
+  public void setUp() {
+    Properties properties = new Properties();
+    properties.put("zeppelin.pig.execType", "local");
+    pigInterpreter = new PigInterpreter(properties);
+    pigInterpreter.open();
+    context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null,
+            null, null);
+  }
+
+  @After
+  public void tearDown() {
+    pigInterpreter.close();
+  }
+
+  @Test
+  public void testBasics() throws IOException {
+    String content = "1\tandy\n"
+            + "2\tpeter\n";
+    File tmpFile = File.createTempFile("zeppelin", "test");
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+
+    // simple pig script using dump
+    String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+            + "dump a;";
+    InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.SUCCESS, result.code());
+    assertTrue(result.message().contains("(1,andy)\n(2,peter)"));
+
+    // describe
+    pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
+            + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.SUCCESS, result.code());
+    assertTrue(result.message().contains("a: {id: int,name: bytearray}"));
+
+    // syntax error (compilation error)
+    pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+            + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.ERROR, result.code());
+    assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'"));
+
+    // execution error
+    pigscript = "a = load 'invalid_path';"
+            + "dump a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.ERROR, result.code());
+    assertTrue(result.message().contains("Input path does not exist"));
+  }
+
+
+  @Test
+  public void testIncludeJobStats() throws IOException {
+    Properties properties = new Properties();
+    properties.put("zeppelin.pig.execType", "local");
+    properties.put("zeppelin.pig.includeJobStats", "true");
+    pigInterpreter = new PigInterpreter(properties);
+    pigInterpreter.open();
+
+    String content = "1\tandy\n"
+            + "2\tpeter\n";
+    File tmpFile = File.createTempFile("zeppelin", "test");
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+
+    // simple pig script using dump
+    String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+            + "dump a;";
+    InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.SUCCESS, result.code());
+    assertTrue(result.message().contains("Counters:"));
+    assertTrue(result.message().contains("(1,andy)\n(2,peter)"));
+
+    // describe
+    pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
+            + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.SUCCESS, result.code());
+    // no job is launched, so no jobStats
+    assertTrue(!result.message().contains("Counters:"));
+    assertTrue(result.message().contains("a: {id: int,name: bytearray}"));
+
+    // syntax error (compilation error)
+    pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+            + "describe a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.ERROR, result.code());
+    // no job is launched, so no jobStats
+    assertTrue(!result.message().contains("Counters:"));
+    assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'"));
+
+    // execution error
+    pigscript = "a = load 'invalid_path';"
+            + "dump a;";
+    result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(Type.TEXT, result.type());
+    assertEquals(Code.ERROR, result.code());
+    assertTrue(result.message().contains("Counters:"));
+    assertTrue(result.message().contains("Input path does not exist"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
----------------------------------------------------------------------
diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
new file mode 100644
index 0000000..00ece44
--- /dev/null
+++ b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.pig;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ */
+public class PigQueryInterpreterTest {
+
+  private PigInterpreter pigInterpreter;
+  private PigQueryInterpreter pigQueryInterpreter;
+  private InterpreterContext context;
+
+  @Before
+  public void setUp() {
+    Properties properties = new Properties();
+    properties.put("zeppelin.pig.execType", "local");
+    properties.put("zeppelin.pig.maxResult", "20");
+
+    pigInterpreter = new PigInterpreter(properties);
+    pigQueryInterpreter = new PigQueryInterpreter(properties);
+    List<Interpreter> interpreters = new ArrayList();
+    interpreters.add(pigInterpreter);
+    interpreters.add(pigQueryInterpreter);
+    InterpreterGroup group = new InterpreterGroup();
+    group.put("note_id", interpreters);
+    pigInterpreter.setInterpreterGroup(group);
+    pigQueryInterpreter.setInterpreterGroup(group);
+    pigInterpreter.open();
+    pigQueryInterpreter.open();
+
+    context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null,
+            null, null);
+  }
+
+  @After
+  public void tearDown() {
+    pigInterpreter.close();
+    pigQueryInterpreter.close();
+  }
+
+  @Test
+  public void testBasics() throws IOException {
+    String content = "andy\tmale\t10\n"
+            + "peter\tmale\t20\n"
+            + "amy\tfemale\t14\n";
+    File tmpFile = File.createTempFile("zeppelin", "test");
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+
+    // run script in PigInterpreter
+    String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (name, gender, age);\n"
+            + "a2 = load 'invalid_path' as (name, gender, age);\n"
+            + "dump a;";
+    InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, result.type());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertTrue(result.message().contains("(andy,male,10)\n(peter,male,20)\n(amy,female,14)"));
+
+    // run single line query in PigQueryInterpreter
+    String query = "foreach a generate name, age;";
+    result = pigQueryInterpreter.interpret(query, context);
+    assertEquals(InterpreterResult.Type.TABLE, result.type());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("name\tage\nandy\t10\npeter\t20\namy\t14\n", result.message());
+
+    // run multiple line query in PigQueryInterpreter
+    query = "b = group a by gender;\nforeach b generate group as gender, COUNT($1) as count;";
+    result = pigQueryInterpreter.interpret(query, context);
+    assertEquals(InterpreterResult.Type.TABLE, result.type());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("gender\tcount\nmale\t2\nfemale\t1\n", result.message());
+
+    // syntax error in PigQueryInterpereter
+    query = "b = group a by invalid_column;\nforeach b generate group as gender, COUNT($1) as count;";
+    result = pigQueryInterpreter.interpret(query, context);
+    assertEquals(InterpreterResult.Type.TEXT, result.type());
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(result.message().contains("Projected field [invalid_column] does not exist in schema"));
+
+    // execution error in PigQueryInterpreter
+    query = "foreach a2 generate name, age;";
+    result = pigQueryInterpreter.interpret(query, context);
+    assertEquals(InterpreterResult.Type.TEXT, result.type());
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(result.message().contains("Input path does not exist"));
+  }
+
+  @Test
+  public void testMaxResult() throws IOException {
+    StringBuilder content = new StringBuilder();
+    for (int i=0;i<30;++i) {
+      content.append(i + "\tname_" + i + "\n");
+    }
+    File tmpFile = File.createTempFile("zeppelin", "test");
+    FileWriter writer = new FileWriter(tmpFile);
+    IOUtils.write(content, writer);
+    writer.close();
+
+    // run script in PigInterpreter
+    String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id, name);";
+    InterpreterResult result = pigInterpreter.interpret(pigscript, context);
+    assertEquals(InterpreterResult.Type.TEXT, result.type());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    // empty output
+    assertTrue(result.message().isEmpty());
+
+    // run single line query in PigQueryInterpreter
+    String query = "foreach a generate id;";
+    result = pigQueryInterpreter.interpret(query, context);
+    assertEquals(InterpreterResult.Type.TABLE, result.type());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertTrue(result.message().contains("id\n0\n1\n2"));
+    assertTrue(result.message().contains("Results are limited by 20"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pig/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/pig/src/test/resources/log4j.properties b/pig/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8daee59
--- /dev/null
+++ b/pig/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger = INFO, stdout
+
+log4j.appender.stdout = org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 03b2263..558ce06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@
     <module>shell</module>
     <module>livy</module>
     <module>hbase</module>
+    <module>pig</module>
     <module>postgresql</module>
     <module>jdbc</module>
     <module>file</module>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/zeppelin-distribution/src/bin_license/LICENSE
----------------------------------------------------------------------
diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE
index 2ee668a..e39a2ad 100644
--- a/zeppelin-distribution/src/bin_license/LICENSE
+++ b/zeppelin-distribution/src/bin_license/LICENSE
@@ -156,7 +156,15 @@ The following components are provided under Apache License.
     (Apache 2.0) Tachyon Project Core (org.tachyonproject:tachyon:0.6.4 - http://tachyonproject.org/tachyon/)
     (Apache 2.0) Tachyon Project Client (org.tachyonproject:tachyon-client:0.6.4 - http://tachyonproject.org/tachyon-client/)
     (Apache 2.0) javax.inject (javax.inject:javax.inject:1 - http://code.google.com/p/atinject/)
-     
+    (Apache 2.0) Apache Pig (org.apache.pig:0.16 - http://pig.apache.org)
+    (Apache 2.0) tez-api (org.apache.tez:tez-api:0.7.0 - http://tez.apache.org)
+    (Apache 2.0) tez-common (org.apache.tez:tez-common:0.7.0 - http://tez.apache.org)
+    (Apache 2.0) tez-dag (org.apache.tez:tez-dag:0.7.0 - http://tez.apache.org)
+    (Apache 2.0) tez-runtime-library (org.apache.tez:runtime-library:0.7.0 - http://tez.apache.org)
+    (Apache 2.0) tez-runtime-internals (org.apache.tez:tez-runtime-internals:0.7.0 - http://tez.apache.org)
+    (Apache 2.0) tez-mapreduce (org.apache.tez:tez-mapreduce:0.7.0 - http://tez.apache.org)
+    (Apache 2.0) tez-yarn-timeline-history-with-acls (org.apache.tez:tez-yarn-timeline-history-with-acls:0.7.0 - http://tez.apache.org)
+
 ========================================================================
 MIT licenses
 ========================================================================

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/465c51a4/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index d819869..414aed2 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -521,6 +521,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
         + "org.apache.zeppelin.alluxio.AlluxioInterpreter,"
         + "org.apache.zeppelin.file.HDFSFileInterpreter,"
         + "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
+        + "org.apache.zeppelin.pig.PigInterpreter,"
+        + "org.apache.zeppelin.pig.PigQueryInterpreter,"
         + "org.apache.zeppelin.flink.FlinkInterpreter,"
         + "org.apache.zeppelin.python.PythonInterpreter,"
         + "org.apache.zeppelin.python.PythonInterpreterPandasSql,"
@@ -543,7 +545,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
     ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
         + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
-        + "scalding,jdbc,hbase,bigquery,beam"),
+        + "scalding,jdbc,hbase,bigquery,beam,pig"),
     ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
     ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
     // use specified notebook (id) as homescreen