You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/11/28 10:19:50 UTC

[2/2] drill git commit: DRILL-4779: Kafka storage plugin (Kamesh Bhallamudi & Anil Kumar Batchu)

DRILL-4779: Kafka storage plugin (Kamesh Bhallamudi & Anil Kumar Batchu)

closes #1052


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d3f8da2b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d3f8da2b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d3f8da2b

Branch: refs/heads/master
Commit: d3f8da2b62b13be28c4133980067942cec2a5faf
Parents: 05d8b3c
Author: Anil Kumar Batchu <ak...@gmail.com>
Authored: Mon Nov 27 18:44:55 2017 -0800
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Tue Nov 28 11:27:18 2017 +0200

----------------------------------------------------------------------
 contrib/pom.xml                                 |   1 +
 contrib/storage-kafka/README.md                 | 230 +++++++++++++
 contrib/storage-kafka/pom.xml                   | 101 ++++++
 .../drill/exec/store/kafka/KafkaGroupScan.java  | 319 +++++++++++++++++++
 .../exec/store/kafka/KafkaRecordReader.java     | 145 +++++++++
 .../exec/store/kafka/KafkaScanBatchCreator.java |  55 ++++
 .../drill/exec/store/kafka/KafkaScanSpec.java   |  40 +++
 .../exec/store/kafka/KafkaStoragePlugin.java    | 100 ++++++
 .../store/kafka/KafkaStoragePluginConfig.java   |  78 +++++
 .../drill/exec/store/kafka/KafkaSubScan.java    | 177 ++++++++++
 .../drill/exec/store/kafka/MessageIterator.java | 114 +++++++
 .../drill/exec/store/kafka/MetaDataField.java   |  37 +++
 .../store/kafka/decoders/JsonMessageReader.java | 104 ++++++
 .../store/kafka/decoders/MessageReader.java     |  45 +++
 .../kafka/decoders/MessageReaderFactory.java    |  63 ++++
 .../drill/exec/store/kafka/package-info.java    |  24 ++
 .../store/kafka/schema/KafkaMessageSchema.java  |  86 +++++
 .../store/kafka/schema/KafkaSchemaFactory.java  |  45 +++
 .../resources/bootstrap-storage-plugins.json    |   9 +
 .../src/main/resources/drill-module.conf        |  30 ++
 .../exec/store/kafka/KafkaMessageGenerator.java | 134 ++++++++
 .../exec/store/kafka/KafkaQueriesTest.java      | 109 +++++++
 .../drill/exec/store/kafka/KafkaTestBase.java   |  92 ++++++
 .../exec/store/kafka/MessageIteratorTest.java   | 106 ++++++
 .../drill/exec/store/kafka/QueryConstants.java  |  40 +++
 .../drill/exec/store/kafka/TestKafkaSuit.java   | 107 +++++++
 .../kafka/cluster/EmbeddedKafkaCluster.java     | 166 ++++++++++
 .../decoders/MessageReaderFactoryTest.java      |  67 ++++
 .../storage-kafka/src/test/resources/login.conf |  25 ++
 distribution/pom.xml                            |   5 +
 distribution/src/assemble/bin.xml               |   1 +
 .../org/apache/drill/exec/ExecConstants.java    |  12 +
 .../server/options/SystemOptionManager.java     |   4 +
 .../src/main/resources/drill-module.conf        |   4 +
 .../apache/drill/exec/proto/UserBitShared.java  |   9 +
 .../exec/proto/beans/CoreOperatorType.java      |   4 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   1 +
 37 files changed, 2688 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index d4ad434..8588987 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -37,6 +37,7 @@
     <module>storage-hive</module>
     <module>storage-mongo</module>
     <module>storage-jdbc</module>
+    <module>storage-kafka</module>
     <module>storage-kudu</module>
     <module>storage-opentsdb</module>
     <module>sqlline</module>

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/README.md
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/README.md b/contrib/storage-kafka/README.md
new file mode 100644
index 0000000..a63731f
--- /dev/null
+++ b/contrib/storage-kafka/README.md
@@ -0,0 +1,230 @@
+# Drill Kafka Plugin
+
+Drill kafka storage plugin allows you to perform interactive analysis using SQL against Apache Kafka.
+
+<h4 id="Supported kafka versions">Supported Kafka Version</h4>
+Kafka-0.10 and above </p>
+
+<h4 id="Supported Message Formats">Message Formats</h4>
+Currently this plugin supports reading only Kafka messages of type <strong>JSON</strong>.
+
+
+<h4>Message Readers</h4>
+<p>Message Readers are used for reading messages from Kafka. Type of the MessageReaders supported as of now are</p>
+
+<table style="width:100%">
+  <tr>
+    <th>MessageReader</th>
+    <th>Description</th>
+    <th>Key DeSerializer</th> 
+    <th>Value DeSerializer</th>
+  </tr>
+  <tr>
+    <td>JsonMessageReader</td>
+    <td>To read Json messages</td>
+    <td>org.apache.kafka.common.serialization.ByteArrayDeserializer</td> 
+    <td>org.apache.kafka.common.serialization.ByteArrayDeserializer</td>
+  </tr>
+</table>
+
+
+<h4 id="Plugin Configurations">Plugin Configurations</h4>
+Drill Kafka plugin supports following properties
+<ul>
+   <li><strong>kafkaConsumerProps</strong>: These are typical <a href="https://kafka.apache.org/documentation/#consumerconfigs">Kafka consumer properties</a>.</li>
+<li><strong>System options</strong>: These are Drill Kafka plugin  system options. <ul>
+<li><strong>store.kafka.record.reader</strong>: Message Reader implementation to use while reading messages from Kafka. Default value is  set to org.apache.drill.exec.store.kafka.decoders.JsonMessageReader
+</li>
+<li><strong>store.kafka.poll.timeout</strong>: Polling timeout used by Kafka client while fetching messages from Kafka cluster. Default value is 200 milliseconds. </li>
+</ul>
+</li>
+</ul>
+
+<h4 id="Plugin Registration">Plugin Registration</h4>
+To register the kafka plugin, open the drill web interface. To open the drill web interface, enter <strong>http://drillbit:8047/storage</strong> in your browser.
+
+<p>The following is an example plugin registration configuration</p>
+<pre>
+{
+  "type": "kafka",
+  "kafkaConsumerProps": {
+    "key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+    "auto.offset.reset": "earliest",
+    "bootstrap.servers": "localhost:9092",
+    "group.id": "drill-query-consumer-1",
+    "enable.auto.commit": "true",
+    "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+    "session.timeout.ms": "30000"
+  },
+  "enabled": true
+}
+</pre>
+
+<h4 id="Abstraction"> Abstraction </h4>
+<p>In Drill, each Kafka topic is mapped to a SQL table and when a query is issued on a table, it scans all the messages from the earliest offset to the latest offset of that topic at that point of time. This plugin automatically discovers all the topics (tables), to allow you perform analysis without executing DDL statements.
+
+<h4 id="Mapping">MetaData</h4>
+This plugin also fetches the additional information about each message. The following additional fields are supported as now
+<ul>
+	<li>kafkaTopic</li>
+	<li>kafkaPartitionId</li>
+	<li>kafkaMsgOffset</li>
+	<li>kafkaMsgTimestamp</li>
+	<li>kafkaMsgKey, unless it is not null</li>
+</ul>
+
+<h4 id="Examples"> Examples </h4>
+
+Kafka topics and message offsets
+
+```
+$bin/kafka-topics --list --zookeeper localhost:2181
+clicks
+clickstream
+clickstream-json-demo
+
+$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic clickstream-json-demo --from-beginning | more
+{"userID":"055e9af4-8c3c-4834-8482-8e05367a7bef","sessionID":"7badf08e-1e1d-4aeb-b853-7df2df4431ac","pageName":"shoes","refferalUrl":"yelp","ipAddress":"20.44.183.126","userAgent":"Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.0 Mobile/14E304 Safari/602.1","client_ts":1509926023099}
+{"userID":"a29454b3-642d-481e-9dd8-0e0d7ef32ef5","sessionID":"b4a89204-b98c-4b4b-a1a9-f28f22d5ead3","pageName":"books","refferalUrl":"yelp","ipAddress":"252.252.113.190","userAgent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.106 Safari/537.36 OPR/38.0.2220.41","client_ts":1509926023100}
+{"userID":"8c53b1c6-da47-4b5a-989d-61b5594f3a1d","sessionID":"baae3a1d-25b2-4955-8d07-20191f29ab32","pageName":"login","refferalUrl":"yelp","ipAddress":"110.170.214.255","userAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0","client_ts":1509926023100}
+
+$ bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic clickstream-json-demo --time -2
+clickstream-json-demo:2:2765000
+clickstream-json-demo:1:2765000
+clickstream-json-demo:3:2765000
+clickstream-json-demo:0:2765000
+
+$ bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic clickstream-json-demo --time -1
+clickstream-json-demo:2:2765245
+clickstream-json-demo:1:2765245
+clickstream-json-demo:3:2765245
+clickstream-json-demo:0:2765245
+
+
+```
+
+
+Drill queries on Kafka
+
+```
+$ bin/sqlline -u jdbc:drill:zk=localhost:2181
+Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support was removed in 8.0
+apache drill 1.12.0-SNAPSHOT
+"json ain't no thang"
+0: jdbc:drill:zk=localhost:2181> use kafka;
++-------+------------------------------------+
+|  ok   |              summary               |
++-------+------------------------------------+
+| true  | Default schema changed to [kafka]  |
++-------+------------------------------------+
+1 row selected (0.564 seconds)
+0: jdbc:drill:zk=localhost:2181> show tables;
++---------------+------------------------------+
+| TABLE_SCHEMA  |          TABLE_NAME          |
++---------------+------------------------------+
+| kafka         | clickstream-json-demo        |
+| kafka         | clickstream                  |
+| kafka         | clicks                       |
++---------------+------------------------------+
+17 rows selected (1.908 seconds)
+0: jdbc:drill:zk=localhost:2181> ALTER SESSION SET `store.kafka.poll.timeout` = 200;
++-------+------------------------------------+
+|  ok   |              summary               |
++-------+------------------------------------+
+| true  | store.kafka.poll.timeout updated.  |
++-------+------------------------------------+
+1 row selected (0.102 seconds)
+0: jdbc:drill:zk=localhost:2181> ALTER SESSION SET `store.kafka.record.reader` = 'org.apache.drill.exec.store.kafka.decoders.JsonMessageReader';
++-------+-------------------------------------+
+|  ok   |               summary               |
++-------+-------------------------------------+
+| true  | store.kafka.record.reader updated.  |
++-------+-------------------------------------+
+1 row selected (0.082 seconds)
+0: jdbc:drill:zk=localhost:2181> select * from kafka.`clickstream-json-demo` limit 2;
++---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
+|                userID                 |               sessionID               |  pageName   | refferalUrl  |    ipAddress     |                                     userAgent                                     |   client_ts    |       kafkaTopic       | kafkaPartitionId  | kafkaMsgOffset  | kafkaMsgTimestamp  |
++---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
+| 6b55a8fa-d0fd-41f0-94e3-7f6b551cdede  | e3bd34a8-b546-4cd5-a0c6-5438589839fc  | categories  | bing         | 198.105.119.221  | Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0  | 1509926023098  | clickstream-json-demo  | 2                 | 2765000         | 1509926023098      |
+| 74cffc37-2df0-4db4-aff9-ed0027a12d03  | 339e3821-5254-4d79-bbae-69bc12808eca  | furniture   | bing         | 161.169.50.60    | Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0     | 1509926023099  | clickstream-json-demo  | 2                 | 2765001         | 1509926023099      |
++---------------------------------------+---------------------------------------+-------------+--------------+------------------+-----------------------------------------------------------------------------------+----------------+------------------------+-------------------+-----------------+--------------------+
+2 rows selected (1.18 seconds)
+0: jdbc:drill:zk=localhost:2181> select count(*) from kafka.`clickstream-json-demo`;
++---------+
+| EXPR$0  |
++---------+
+| 980     |
++---------+
+1 row selected (0.732 seconds)
+0: jdbc:drill:zk=localhost:2181> select kafkaPartitionId, MIN(kafkaMsgOffset) as minOffset, MAX(kafkaMsgOffset) as maxOffset from kafka.`clickstream-json-demo` group by kafkaPartitionId;
++-------------------+------------+------------+
+| kafkaPartitionId  | minOffset  | maxOffset  |
++-------------------+------------+------------+
+| 2                 | 2765000    | 2765244    |
+| 1                 | 2765000    | 2765244    |
+| 3                 | 2765000    | 2765244    |
+| 0                 | 2765000    | 2765244    |
++-------------------+------------+------------+
+4 rows selected (3.081 seconds)
+0: jdbc:drill:zk=localhost:2181> select kafkaPartitionId, from_unixtime(MIN(kafkaMsgTimestamp)/1000) as minKafkaTS, from_unixtime(MAX(kafkaMsgTimestamp)/1000) as maxKafkaTs from kafka.`clickstream-json-demo` group by kafkaPartitionId;
++-------------------+----------------------+----------------------+
+| kafkaPartitionId  |      minKafkaTS      |      maxKafkaTs      |
++-------------------+----------------------+----------------------+
+| 2                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
+| 1                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
+| 3                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
+| 0                 | 2017-11-05 15:53:43  | 2017-11-05 15:53:43  |
++-------------------+----------------------+----------------------+
+4 rows selected (2.758 seconds)
+0: jdbc:drill:zk=localhost:2181> select distinct(refferalUrl) from kafka.`clickstream-json-demo`;
++--------------+
+| refferalUrl  |
++--------------+
+| bing         |
+| yahoo        |
+| yelp         |
+| google       |
++--------------+
+4 rows selected (2.944 seconds)
+0: jdbc:drill:zk=localhost:2181> select pageName, count(*) from kafka.`clickstream-json-demo` group by pageName;
++--------------+---------+
+|   pageName   | EXPR$1  |
++--------------+---------+
+| categories   | 89      |
+| furniture    | 89      |
+| mobiles      | 89      |
+| clothing     | 89      |
+| sports       | 89      |
+| offers       | 89      |
+| shoes        | 89      |
+| books        | 89      |
+| login        | 90      |
+| electronics  | 89      |
+| toys         | 89      |
++--------------+---------+
+11 rows selected (2.493 seconds)
+
+```
+
+
+Note: 
+
+- store.kafka.record.reader system option can be used for setting record reader and default is org.apache.drill.exec.store.kafka.decoders.JsonMessageReader
+- Default store.kafka.poll.timeout is set to 200, user has to set this accordingly
+- Custom record reader can be implemented by extending org.apache.drill.exec.store.kafka.decoders.MessageReader and setting store.kafka.record.reader accordinlgy
+
+
+In case of JSON message format, following system options can be used accordingly. More details can be found in [Drill Json Model](https://drill.apache.org/docs/json-data-model/) and in [Drill system options configurations](https://drill.apache.org/docs/configuration-options-introduction/)
+
+<ui>
+  <li>ALTER SESSION SET `store.kafka.record.reader` = 'org.apache.drill.exec.store.kafka.decoders.JsonMessageReader';</li>
+  <li>ALTER SESSION SET `store.kafka.poll.timeout` = 200;</li>
+  <li>ALTER SESSION SET `exec.enable_union_type` = true; </li>
+  <li>ALTER SESSION SET `store.kafka.all_text_mode` = true;</li>
+  <li>ALTER SESSION SET `store.kafka.read_numbers_as_double` = true;</li>
+</ui>
+
+<h4 id="RoadMap">RoadMap</h4>
+ <ul>
+   <li>AVRO Message format support</li>
+ </ul>

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
new file mode 100644
index 0000000..1357a4e
--- /dev/null
+++ b/contrib/storage-kafka/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.12.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-storage-kafka</artifactId>
+  <name>contrib/kafka-storage-plugin</name>
+
+  <properties>
+    <kafka.version>0.11.0.1</kafka.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>${kafka.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- Test dependencie -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <version>3.3.0</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
new file mode 100644
index 0000000..e08c7d7
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = LoggerFactory.getLogger(KafkaGroupScan.class);
+
+  // Assuming default average topic message size as 1KB, which will be used to
+  // compute the stats and work assignments
+  private static final long MSG_SIZE = 1024;
+
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List<SchemaPath> columns;
+  private final KafkaScanSpec kafkaScanSpec;
+
+  private List<PartitionScanWork> partitionWorkList;
+  private ListMultimap<Integer, PartitionScanWork> assignments;
+  private List<EndpointAffinity> affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+      @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig,
+      @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+      @JacksonInject StoragePluginRegistry pluginRegistry) {
+    this(userName, kafkaStoragePluginConfig, columns, scanSpec, (KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, KafkaScanSpec kafkaScanSpec, List<SchemaPath> columns) {
+    super(StringUtils.EMPTY);
+    this.kafkaStoragePlugin = kafkaStoragePlugin;
+    this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) kafkaStoragePlugin.getConfig();
+    this.columns = columns;
+    this.kafkaScanSpec = kafkaScanSpec;
+    init();
+  }
+
+  public KafkaGroupScan(String userName, KafkaStoragePluginConfig kafkaStoragePluginConfig, List<SchemaPath> columns,
+      KafkaScanSpec kafkaScanSpec, KafkaStoragePlugin pluginRegistry) {
+    super(userName);
+    this.kafkaStoragePluginConfig = kafkaStoragePluginConfig;
+    this.columns = columns;
+    this.kafkaScanSpec = kafkaScanSpec;
+    this.kafkaStoragePlugin = pluginRegistry;
+    init();
+  }
+
+  public KafkaGroupScan(KafkaGroupScan that) {
+    super(that);
+    this.kafkaStoragePluginConfig = that.kafkaStoragePluginConfig;
+    this.columns = that.columns;
+    this.kafkaScanSpec = that.kafkaScanSpec;
+    this.kafkaStoragePlugin = that.kafkaStoragePlugin;
+    this.partitionWorkList = that.partitionWorkList;
+    this.assignments = that.assignments;
+  }
+
+  private static class PartitionScanWork implements CompleteWork {
+
+    private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
+
+    private final TopicPartition topicPartition;
+    private final long beginOffset;
+    private final long latestOffset;
+
+    public PartitionScanWork(TopicPartition topicPartition, long beginOffset, long latestOffset) {
+      this.topicPartition = topicPartition;
+      this.beginOffset = beginOffset;
+      this.latestOffset = latestOffset;
+    }
+
+    public TopicPartition getTopicPartition() {
+      return topicPartition;
+    }
+
+    public long getBeginOffset() {
+      return beginOffset;
+    }
+
+    public long getLatestOffset() {
+      return latestOffset;
+    }
+
+    @Override
+    public int compareTo(CompleteWork o) {
+      return Long.compare(getTotalBytes(), o.getTotalBytes());
+    }
+
+    @Override
+    public long getTotalBytes() {
+      return (latestOffset - beginOffset) * MSG_SIZE;
+    }
+
+    @Override
+    public EndpointByteMap getByteMap() {
+      return byteMap;
+    }
+
+  }
+
+  /**
+   * Computes work per topic partition, based on start and end offset of each
+   * corresponding topicPartition
+   */
+  private void init() {
+    partitionWorkList = Lists.newArrayList();
+    Collection<DrillbitEndpoint> endpoints = kafkaStoragePlugin.getContext().getBits();
+    Map<String, DrillbitEndpoint> endpointMap = Maps.newHashMap();
+    for (DrillbitEndpoint endpoint : endpoints) {
+      endpointMap.put(endpoint.getAddress(), endpoint);
+    }
+
+    Map<TopicPartition, Long> startOffsetsMap = Maps.newHashMap();
+    Map<TopicPartition, Long> endOffsetsMap = Maps.newHashMap();
+    List<PartitionInfo> topicPartitions = null;
+    String topicName = kafkaScanSpec.getTopicName();
+
+    try (KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(kafkaStoragePlugin.getConfig().getKafkaConsumerProps(),
+        new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+      if (!kafkaConsumer.listTopics().keySet().contains(topicName)) {
+        throw UserException.dataReadError()
+            .message("Table '%s' does not exist", topicName)
+            .build(logger);
+      }
+
+      kafkaConsumer.subscribe(Arrays.asList(topicName));
+      // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
+      // evaluates lazily, seeking to the first/last offset in all partitions only
+      // when poll(long) or
+      // position(TopicPartition) are called
+      kafkaConsumer.poll(0);
+      Set<TopicPartition> assignments = kafkaConsumer.assignment();
+      topicPartitions = kafkaConsumer.partitionsFor(topicName);
+
+      // fetch start offsets for each topicPartition
+      kafkaConsumer.seekToBeginning(assignments);
+      for (TopicPartition topicPartition : assignments) {
+        startOffsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
+      }
+
+      // fetch end offsets for each topicPartition
+      kafkaConsumer.seekToEnd(assignments);
+      for (TopicPartition topicPartition : assignments) {
+        endOffsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
+      }
+    } catch (Exception e) {
+      throw UserException.dataReadError(e).message("Failed to fetch start/end offsets of the topic  %s", topicName)
+          .addContext(e.getMessage()).build(logger);
+    }
+
+    // computes work for each end point
+    for (PartitionInfo partitionInfo : topicPartitions) {
+      TopicPartition topicPartition = new TopicPartition(topicName, partitionInfo.partition());
+      long lastCommittedOffset = startOffsetsMap.get(topicPartition);
+      long latestOffset = endOffsetsMap.get(topicPartition);
+      logger.debug("Latest offset of {} is {}", topicPartition, latestOffset);
+      logger.debug("Last committed offset of {} is {}", topicPartition, lastCommittedOffset);
+      PartitionScanWork work = new PartitionScanWork(topicPartition, lastCommittedOffset, latestOffset);
+      Node[] inSyncReplicas = partitionInfo.inSyncReplicas();
+      for (Node isr : inSyncReplicas) {
+        String host = isr.host();
+        DrillbitEndpoint ep = endpointMap.get(host);
+        if (ep != null) {
+          work.getByteMap().add(ep, work.getTotalBytes());
+        }
+      }
+      partitionWorkList.add(work);
+    }
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+    assignments = AssignmentCreator.getMappings(incomingEndpoints, partitionWorkList);
+  }
+
+  @Override
+  public KafkaSubScan getSpecificScan(int minorFragmentId) {
+    List<PartitionScanWork> workList = assignments.get(minorFragmentId);
+    List<KafkaSubScanSpec> scanSpecList = Lists.newArrayList();
+
+    for (PartitionScanWork work : workList) {
+      scanSpecList.add(new KafkaSubScanSpec(work.getTopicPartition().topic(), work.getTopicPartition().partition(),
+          work.getBeginOffset(), work.getLatestOffset()));
+    }
+
+    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, kafkaStoragePluginConfig, columns, scanSpecList);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return partitionWorkList.size();
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    long messageCount = 0;
+    for (PartitionScanWork work : partitionWorkList) {
+      messageCount += (work.getLatestOffset() - work.getBeginOffset());
+    }
+    return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, messageCount, 1, messageCount * MSG_SIZE);
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new KafkaGroupScan(this);
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    if (affinities == null) {
+      affinities = AffinityCreator.getAffinityMap(partitionWorkList);
+    }
+    return affinities;
+  }
+
+  @Override
+  @JsonIgnore
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    KafkaGroupScan clone = new KafkaGroupScan(this);
+    clone.columns = columns;
+    return clone;
+  }
+
+  @JsonProperty("kafkaStoragePluginConfig")
+  public KafkaStoragePluginConfig getStorageConfig() {
+    return this.kafkaStoragePluginConfig;
+  }
+
+  @JsonProperty
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty("kafkaScanSpec")
+  public KafkaScanSpec getScanSpec() {
+    return kafkaScanSpec;
+  }
+
+  @JsonIgnore
+  public KafkaStoragePlugin getStoragePlugin() {
+    return kafkaStoragePlugin;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]", kafkaScanSpec, columns);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
new file mode 100644
index 0000000..f034a8a
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private final boolean unionEnabled;
+  private final KafkaStoragePlugin plugin;
+  private final KafkaSubScanSpec subScanSpec;
+  private final long kafkaPollTimeOut;
+
+  private long currentOffset;
+  private MessageIterator msgItr;
+
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+  private final String kafkaMsgReader;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
+      FragmentContext context, KafkaStoragePlugin plugin) {
+    setColumns(projectedColumns);
+    this.enableAllTextMode = context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+    this.readNumbersAsDouble = context.getOptions()
+        .getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+    OptionManager options = context.getOptions();
+    this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE);
+    this.kafkaMsgReader = options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
+    this.kafkaPollTimeOut = options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
+    this.plugin = plugin;
+    this.subScanSpec = subScanSpec;
+  }
+
+  @Override
+  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
+    Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+    if (!isStarQuery()) {
+      for (SchemaPath column : projectedColumns) {
+        transformed.add(column);
+      }
+    } else {
+      transformed.add(Utilities.STAR_COLUMN);
+    }
+    return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    this.writer = new VectorContainerWriter(output, unionEnabled);
+    messageReader = MessageReaderFactory.getMessageReader(kafkaMsgReader);
+    messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), this.writer,
+        this.enableAllTextMode, this.readNumbersAsDouble);
+    msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, kafkaPollTimeOut);
+  }
+
+  /**
+   * KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
+   * take care of polling multiple times for this given batch next invocation
+   */
+  @Override
+  public int next() {
+    writer.allocate();
+    writer.reset();
+    Stopwatch watch = Stopwatch.createStarted();
+    int messageCount = 0;
+
+    try {
+      while (currentOffset < subScanSpec.getEndOffset() - 1 && msgItr.hasNext()) {
+        ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
+        currentOffset = consumerRecord.offset();
+        writer.setPosition(messageCount);
+        messageReader.readMessage(consumerRecord);
+        if (++messageCount >= DEFAULT_MESSAGES_PER_BATCH) {
+          break;
+        }
+      }
+
+      messageReader.ensureAtLeastOneField();
+      writer.setValueCount(messageCount);
+      logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), messageCount);
+      logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
+          currentOffset);
+      return messageCount;
+    } catch (Exception e) {
+      String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (messageCount + 1);
+      throw UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    logger.info("Last offset processed for {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
+        currentOffset);
+    logger.info("Total time to fetch messages from {}:{} is - {} milliseconds", subScanSpec.getTopicName(),
+        subScanSpec.getPartitionId(), msgItr.getTotalFetchTime());
+    messageReader.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
new file mode 100644
index 0000000..026e341
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> {
+  static final Logger logger = LoggerFactory.getLogger(KafkaScanBatchCreator.class);
+
+  @Override
+  public CloseableRecordBatch getBatch(FragmentContext context, KafkaSubScan subScan, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    List<SchemaPath> columns = subScan.getColumns() != null ? subScan.getColumns() : GroupScan.ALL_COLUMNS;
+
+    List<RecordReader> readers = Lists.newArrayListWithCapacity(subScan.getPartitionSubScanSpecList().size());
+    for (KafkaSubScan.KafkaSubScanSpec scanSpec : subScan.getPartitionSubScanSpecList()) {
+      readers.add(new KafkaRecordReader(scanSpec, columns, context, subScan.getKafkaStoragePlugin()));
+    }
+
+    logger.info("Number of record readers initialized : {}", readers.size());
+    return new ScanBatch(subScan, context, readers);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
new file mode 100644
index 0000000..91c8fdf
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class KafkaScanSpec {
+  private String topicName;
+
+  @JsonCreator
+  public KafkaScanSpec(@JsonProperty("topicName") String topicName) {
+    this.topicName = topicName;
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  @Override
+  public String toString() {
+    return "KafkaScanSpec [topicName=" + topicName + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
new file mode 100644
index 0000000..8986ea7
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.kafka.schema.KafkaSchemaFactory;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Closer;
+
+public class KafkaStoragePlugin extends AbstractStoragePlugin {
+
+  private static final Logger logger = LoggerFactory.getLogger(KafkaStoragePlugin.class);
+  private final KafkaSchemaFactory kafkaSchemaFactory;
+  private final KafkaStoragePluginConfig config;
+  private final DrillbitContext context;
+  private final Closer closer = Closer.create();
+
+  public KafkaStoragePlugin(KafkaStoragePluginConfig config, DrillbitContext context, String name)
+      throws ExecutionSetupException {
+    logger.debug("Initializing {}", KafkaStoragePlugin.class.getName());
+    this.config = config;
+    this.context = context;
+    this.kafkaSchemaFactory = new KafkaSchemaFactory(this, name);
+  }
+
+  public DrillbitContext getContext() {
+    return this.context;
+  }
+
+  @Override
+  public KafkaStoragePluginConfig getConfig() {
+    return this.config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    this.kafkaSchemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
+    return ImmutableSet.of();
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName,
+      JSONOptions selection) throws IOException {
+    KafkaScanSpec kafkaScanSpec = selection.getListWith(new ObjectMapper(),
+        new TypeReference<KafkaScanSpec>() {
+        });
+    return new KafkaGroupScan(this, kafkaScanSpec, null);
+  }
+
+  public KafkaConsumer<byte[], byte[]> registerConsumer(KafkaConsumer<byte[], byte[]> consumer) {
+    return closer.register(consumer);
+  }
+
+  @Override
+  public void close() throws IOException {
+    closer.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
new file mode 100644
index 0000000..94afa5f
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName(KafkaStoragePluginConfig.NAME)
+public class KafkaStoragePluginConfig extends StoragePluginConfig {
+
+  private static final Logger logger = LoggerFactory.getLogger(KafkaStoragePluginConfig.class);
+  public static final String NAME = "kafka";
+  private Properties kafkaConsumerProps;
+
+  @JsonCreator
+  public KafkaStoragePluginConfig(@JsonProperty("kafkaConsumerProps") Map<String, String> kafkaConsumerProps) {
+    this.kafkaConsumerProps = new Properties();
+    this.kafkaConsumerProps.putAll(kafkaConsumerProps);
+    logger.debug("Kafka Consumer Props {}", this.kafkaConsumerProps);
+  }
+
+  public Properties getKafkaConsumerProps() {
+    return kafkaConsumerProps;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((kafkaConsumerProps == null) ? 0 : kafkaConsumerProps.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    KafkaStoragePluginConfig other = (KafkaStoragePluginConfig) obj;
+    if (kafkaConsumerProps == null && other.kafkaConsumerProps == null) {
+      return true;
+    }
+    if (kafkaConsumerProps == null || other.kafkaConsumerProps == null) {
+      return false;
+    }
+    return kafkaConsumerProps.equals(other.kafkaConsumerProps);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
new file mode 100644
index 0000000..fc110b5
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig KafkaStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List<SchemaPath> columns;
+  private final List<KafkaSubScanSpec> partitions;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName,
+      @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig,
+      @JsonProperty("columns") List<SchemaPath> columns,
+      @JsonProperty("partitionSubScanSpecList") LinkedList<KafkaSubScanSpec> partitions)
+      throws ExecutionSetupException {
+    super(userName);
+    this.KafkaStoragePluginConfig = kafkaStoragePluginConfig;
+    this.columns = columns;
+    this.partitions = partitions;
+    this.kafkaStoragePlugin = (KafkaStoragePlugin) registry.getPlugin(kafkaStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, KafkaStoragePluginConfig kafkStoragePluginConfig,
+      List<SchemaPath> columns, List<KafkaSubScanSpec> partitionSubScanSpecList) {
+    super(userName);
+    this.columns = columns;
+    this.KafkaStoragePluginConfig = kafkStoragePluginConfig;
+    this.kafkaStoragePlugin = plugin;
+    this.partitions = partitionSubScanSpecList;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, KafkaStoragePluginConfig, columns,
+        partitions);
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Collections.emptyIterator();
+  }
+
+  @JsonIgnore
+  public KafkaStoragePluginConfig getKafkaStoragePluginConfig() {
+    return KafkaStoragePluginConfig;
+  }
+
+  @JsonIgnore
+  public KafkaStoragePlugin getKafkaStoragePlugin() {
+    return kafkaStoragePlugin;
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  public List<KafkaSubScanSpec> getPartitionSubScanSpecList() {
+    return partitions;
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.KAFKA_SUB_SCAN_VALUE;
+  }
+
+  public static class KafkaSubScanSpec {
+    protected String topicName;
+    protected int partitionId;
+    protected long startOffset;
+    protected long endOffset;
+
+    @JsonCreator
+    public KafkaSubScanSpec(@JsonProperty("topicName") String topicName, @JsonProperty("partitionId") int partitionId,
+        @JsonProperty("startOffset") long startOffset, @JsonProperty("endOffset") long endOffset) {
+      this.topicName = topicName;
+      this.partitionId = partitionId;
+      this.startOffset = startOffset;
+      this.endOffset = endOffset;
+    }
+
+    KafkaSubScanSpec() {
+
+    }
+
+    public String getTopicName() {
+      return topicName;
+    }
+
+    public int getPartitionId() {
+      return partitionId;
+    }
+
+    public long getStartOffset() {
+      return startOffset;
+    }
+
+    public long getEndOffset() {
+      return endOffset;
+    }
+
+    public KafkaSubScanSpec setTopicName(String topicName) {
+      this.topicName = topicName;
+      return this;
+    }
+
+    public KafkaSubScanSpec setPartitionId(int partitionId) {
+      this.partitionId = partitionId;
+      return this;
+    }
+
+    public KafkaSubScanSpec setStartOffset(long startOffset) {
+      this.startOffset = startOffset;
+      return this;
+    }
+
+    public KafkaSubScanSpec setEndOffset(long endOffset) {
+      this.endOffset = endOffset;
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return "KafkaSubScanSpec [topicName=" + topicName + ", partitionId=" + partitionId + ", startOffset="
+          + startOffset + ", endOffset=" + endOffset + "]";
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
new file mode 100644
index 0000000..3afb1b8
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+import kafka.common.KafkaException;
+
+public class MessageIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
+
+  private static final Logger logger = LoggerFactory.getLogger(MessageIterator.class);
+  private final KafkaConsumer<byte[], byte[]> kafkaConsumer;
+  private Iterator<ConsumerRecord<byte[], byte[]>> recordIter;
+  private final TopicPartition topicPartition;
+  private long totalFetchTime = 0;
+  private final long kafkaPollTimeOut;
+  private final long endOffset;
+
+  public MessageIterator(final KafkaConsumer<byte[], byte[]> kafkaConsumer, final KafkaSubScanSpec subScanSpec,
+      final long kafkaPollTimeOut) {
+    this.kafkaConsumer = kafkaConsumer;
+    this.kafkaPollTimeOut = kafkaPollTimeOut;
+
+    List<TopicPartition> partitions = Lists.newArrayListWithCapacity(1);
+    topicPartition = new TopicPartition(subScanSpec.getTopicName(), subScanSpec.getPartitionId());
+    partitions.add(topicPartition);
+    this.kafkaConsumer.assign(partitions);
+    logger.info("Start offset of {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
+        subScanSpec.getStartOffset());
+    this.kafkaConsumer.seek(topicPartition, subScanSpec.getStartOffset());
+    this.endOffset = subScanSpec.getEndOffset();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("Does not support remove operation");
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (recordIter != null && recordIter.hasNext()) {
+      return true;
+    }
+
+    long nextPosition = kafkaConsumer.position(topicPartition);
+    if (nextPosition >= endOffset) {
+      return false;
+    }
+
+    ConsumerRecords<byte[], byte[]> consumerRecords = null;
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    try {
+      consumerRecords = kafkaConsumer.poll(kafkaPollTimeOut);
+    } catch (KafkaException ke) {
+      logger.error(ke.getMessage(), ke);
+      throw UserException.dataReadError(ke).message(ke.getMessage()).build(logger);
+    }
+    stopwatch.stop();
+
+    if (consumerRecords.isEmpty()) {
+      String errorMsg = new StringBuilder().append("Failed to fetch messages within ").append(kafkaPollTimeOut)
+          .append(" milliseconds. Consider increasing the value of the property : ")
+          .append(ExecConstants.KAFKA_POLL_TIMEOUT).toString();
+      throw UserException.dataReadError().message(errorMsg).build(logger);
+    }
+
+    long lastFetchTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+    logger.debug("Total number of messages fetched : {}", consumerRecords.count());
+    logger.debug("Time taken to fetch : {} milliseconds", lastFetchTime);
+    totalFetchTime += lastFetchTime;
+
+    recordIter = consumerRecords.iterator();
+    return recordIter.hasNext();
+  }
+
+  public long getTotalFetchTime() {
+    return this.totalFetchTime;
+  }
+
+  @Override
+  public ConsumerRecord<byte[], byte[]> next() {
+    return recordIter.next();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
new file mode 100644
index 0000000..cdaee9b
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+/**
+ * MetaData fields provide additional information about each message.
+ * It is expected that one should not modify the fieldName of each constant as it breaks the compatibility.
+ */
+public enum MetaDataField {
+  KAFKA_TOPIC("kafkaTopic"), KAFKA_PARTITION_ID("kafkaPartitionId"), KAFKA_OFFSET("kafkaMsgOffset"), KAFKA_TIMESTAMP(
+      "kafkaMsgTimestamp"), KAFKA_MSG_KEY("kafkaMsgKey");
+
+  private final String fieldName;
+
+  private MetaDataField(final String fieldName) {
+    this.fieldName = fieldName;
+  }
+
+  public String getFieldName() {
+    return fieldName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
new file mode 100644
index 0000000..9ad6107
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_MSG_KEY;
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_OFFSET;
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_PARTITION_ID;
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TIMESTAMP;
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TOPIC;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * MessageReader class which will convert ConsumerRecord into JSON and writes to
+ * VectorContainerWriter of JsonReader
+ *
+ */
+public class JsonMessageReader implements MessageReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(JsonMessageReader.class);
+  private JsonReader jsonReader;
+  private VectorContainerWriter writer;
+
+  @Override
+  public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, boolean allTextMode,
+      boolean readNumbersAsDouble) {
+    // set skipOuterList to false as it doesn't applicable for JSON records and it's only applicable for JSON files.
+    this.jsonReader = new JsonReader(buf, columns, allTextMode, false, readNumbersAsDouble);
+    this.writer = writer;
+  }
+
+  @Override
+  public void readMessage(ConsumerRecord<?, ?> record) {
+    try {
+      byte[] recordArray = (byte[]) record.value();
+      JsonObject jsonObj = (new JsonParser()).parse(new String(recordArray, Charsets.UTF_8)).getAsJsonObject();
+      jsonObj.addProperty(KAFKA_TOPIC.getFieldName(), record.topic());
+      jsonObj.addProperty(KAFKA_PARTITION_ID.getFieldName(), record.partition());
+      jsonObj.addProperty(KAFKA_OFFSET.getFieldName(), record.offset());
+      jsonObj.addProperty(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
+      jsonObj.addProperty(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? record.key().toString() : null);
+      jsonReader.setSource(jsonObj.toString().getBytes(Charsets.UTF_8));
+      jsonReader.write(writer);
+    } catch (IOException e) {
+      throw UserException.dataReadError(e).message(e.getMessage())
+          .addContext("MessageReader", JsonMessageReader.class.getName()).build(logger);
+    }
+  }
+
+  @Override
+  public void ensureAtLeastOneField() {
+    jsonReader.ensureAtLeastOneField(writer);
+  }
+
+  @Override
+  public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin) {
+    return plugin.registerConsumer(new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps(),
+        new ByteArrayDeserializer(), new ByteArrayDeserializer()));
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.writer.clear();
+    try {
+      this.writer.close();
+    } catch (Exception e) {
+      logger.warn("Error while closing JsonMessageReader", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
new file mode 100644
index 0000000..510a520
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * MessageReader interface provides mechanism to handle various Kafka Message
+ * Formats like JSON, AVRO or custom message formats.
+ */
+public interface MessageReader extends Closeable {
+
+  public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, boolean allTextMode,
+      boolean readNumbersAsDouble);
+
+  public void readMessage(ConsumerRecord<?, ?> message);
+
+  public void ensureAtLeastOneField();
+
+  public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
new file mode 100644
index 0000000..cd83f96
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader session
+   * property
+   *
+   * @param messageReaderKlass
+   *          value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *           in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+    if (messageReaderKlass == null) {
+      throw UserException.validationError()
+          .message("Please configure message reader implementation using the property 'store.kafka.record.reader'")
+          .build(logger);
+    }
+
+    MessageReader messageReader = null;
+    try {
+      Class<?> klass = Class.forName(messageReaderKlass);
+      if (MessageReader.class.isAssignableFrom(klass)) {
+        messageReader = (MessageReader) klass.newInstance();
+        logger.info("Initialized Message Reader : {}", messageReader);
+      }
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+      throw UserException.validationError().message("Failed to initialize message reader : %s", messageReaderKlass)
+          .build(logger);
+    }
+
+    if (messageReader == null) {
+      throw UserException.validationError().message("Message reader configured '%s' does not implement '%s'",
+          messageReaderKlass, MessageReader.class.getName()).build(logger);
+    }
+    return messageReader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/package-info.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/package-info.java
new file mode 100644
index 0000000..53c3493
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Kafka storage plugin.
+ *
+ * Enables querying Kafka as a data store, supported for Avro and Json message
+ * types.
+ */
+package org.apache.drill.exec.store.kafka;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
new file mode 100644
index 0000000..65b532d
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.schema;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.kafka.KafkaScanSpec;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+public class KafkaMessageSchema extends AbstractSchema {
+
+  private static final Logger logger = LoggerFactory.getLogger(KafkaMessageSchema.class);
+  private final KafkaStoragePlugin plugin;
+  private final Map<String, DrillTable> drillTables = Maps.newHashMap();
+  private Set<String> tableNames;
+
+  public KafkaMessageSchema(final KafkaStoragePlugin plugin, final String name) {
+    super(ImmutableList.<String> of(), name);
+    this.plugin = plugin;
+  }
+
+  @Override
+  public String getTypeName() {
+    return KafkaStoragePluginConfig.NAME;
+  }
+
+  void setHolder(SchemaPlus plusOfThis) {
+    for (String s : getSubSchemaNames()) {
+      plusOfThis.add(s, getSubSchema(s));
+    }
+  }
+
+  @Override
+  public Table getTable(String tableName) {
+    if (!drillTables.containsKey(tableName)) {
+      KafkaScanSpec scanSpec = new KafkaScanSpec(tableName);
+      DrillTable table = new DynamicDrillTable(plugin, getName(), scanSpec);
+      drillTables.put(tableName, table);
+    }
+
+    return drillTables.get(tableName);
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    if (tableNames == null) {
+      try (KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps())) {
+        tableNames = kafkaConsumer.listTopics().keySet();
+      } catch(KafkaException e) {
+        throw UserException.dataReadError(e).message("Failed to get tables information").addContext(e.getMessage())
+            .build(logger);
+      }
+    }
+    return tableNames;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
new file mode 100644
index 0000000..8f44a93
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.schema;
+
+import java.io.IOException;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+
+public class KafkaSchemaFactory implements SchemaFactory {
+
+  private final String schemaName;
+  private final KafkaStoragePlugin plugin;
+
+  public KafkaSchemaFactory(KafkaStoragePlugin plugin, String schemaName) {
+    this.plugin = plugin;
+    this.schemaName = schemaName;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent)
+      throws IOException {
+    KafkaMessageSchema schema = new KafkaMessageSchema(this.plugin, this.schemaName);
+    SchemaPlus hPlus = parent.add(schemaName, schema);
+    schema.setHolder(hPlus);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..406c030
--- /dev/null
+++ b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,9 @@
+{
+  "storage":{
+    kafka : {
+      type:"kafka",
+      enabled: false,
+      kafkaConsumerProps: {"bootstrap.servers":"localhost:9092", "group.id" : "drill-consumer"}
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/main/resources/drill-module.conf b/contrib/storage-kafka/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..4b813a4
--- /dev/null
+++ b/contrib/storage-kafka/src/main/resources/drill-module.conf
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//  This file tells Drill to consider this module when class path scanning.
+//  This file can also include any supplementary configuration information.
+//  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+  packages += "org.apache.drill.exec.store.kafka"
+}
+drill.exec: {
+
+  sys.store.provider: {
+    kafka : {
+      "kafkaConsumerProps" : "{\"bootstrap.servers\" : \"localhost:9092\"}"
+    }
+  }
+
+}