You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2016/08/28 08:09:41 UTC

tajo git commit: TAJO-1487: Kafka Scanner for kafka strage.

Repository: tajo
Updated Branches:
  refs/heads/master ff4673c5b -> 4e2d21c2e


TAJO-1487: Kafka Scanner for kafka strage.

Closes #1043

Signed-off-by: Jinho Kim <jh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4e2d21c2
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4e2d21c2
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4e2d21c2

Branch: refs/heads/master
Commit: 4e2d21c2e9de69430293c7a46bb71f1070accce8
Parents: ff4673c
Author: Byunghwa Yun <co...@combineads.co.kr>
Authored: Sun Aug 28 17:07:50 2016 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Sun Aug 28 17:07:50 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/BuiltinStorages.java   |   1 +
 tajo-project/pom.xml                            |   2 +-
 .../storage/fragment/BuiltinFragmentKinds.java  |   1 +
 tajo-storage/tajo-storage-kafka/pom.xml         |  61 +++++
 .../tajo/storage/kafka/KafkaFragment.java       | 176 ++++++++++++
 .../tajo/storage/kafka/KafkaFragmentSerde.java  |  61 +++++
 .../apache/tajo/storage/kafka/KafkaScanner.java | 273 +++++++++++++++++++
 .../storage/kafka/KafkaStorageConstants.java    |  32 +++
 .../src/main/proto/StorageFragmentProtos.proto  |  37 +++
 .../tajo/storage/kafka/TestKafkaScanner.java    |  99 +++++++
 11 files changed, 744 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/4e2d21c2/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 991398b..5e7fc0f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -408,6 +408,8 @@ Release 0.12.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1487: Kafka Scanner for kafka strage. (Byunghwa Yun via jinho)
+
     TAJO-2174: Implement HiveCatalogStore#alterTable. (Lee Dongjin via jinho)
 
     TAJO-1480: Kafka Consumer for kafka strage. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4e2d21c2/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java
index aa7a9e7..180bd4f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java
+++ b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java
@@ -32,4 +32,5 @@ public class BuiltinStorages {
   public static final String HBASE = "HBASE";
   public static final String SYSTEM = "SYSTEM";
   public static final String EX_HTTP_JSON = "EX_HTTP_JSON";
+  public static final String KAFKA = "KAFKA";
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4e2d21c2/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 075e6e7..e6a018c 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -40,7 +40,7 @@
     <jersey.version>2.6</jersey.version>
     <jetty.version>6.1.26</jetty.version>
     <parquet.version>1.8.1</parquet.version>
-    <kafka.version>0.10.0.0</kafka.version>
+    <kafka.version>0.10.0.1</kafka.version>
     <tajo.root>${project.parent.relativePath}/..</tajo.root>
     <extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path>
   </properties>

http://git-wip-us.apache.org/repos/asf/tajo/blob/4e2d21c2/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
index cc7eea5..72df3f5 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
@@ -23,4 +23,5 @@ public class BuiltinFragmentKinds {
   public static final String HBASE = "HBASE";
   public static final String JDBC = "JDBC";
   public static final String HTTP = "EXAMPLE-HTTP";
+  public static final String KAFKA = "KAFKA";
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4e2d21c2/tajo-storage/tajo-storage-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/pom.xml b/tajo-storage/tajo-storage-kafka/pom.xml
index 136af4a..82a55ff 100644
--- a/tajo-storage/tajo-storage-kafka/pom.xml
+++ b/tajo-storage/tajo-storage-kafka/pom.xml
@@ -81,6 +81,67 @@
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-protobuf-generated-sources-directory</id>
+            <phase>initialize</phase>
+            <configuration>
+              <target>
+                <mkdir dir="target/generated-sources/proto" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2</version>
+        <executions>
+          <execution>
+            <id>generate-sources</id>
+            <phase>generate-sources</phase>
+            <configuration>
+              <executable>protoc</executable>
+              <arguments>
+                <argument>-Isrc/main/proto/</argument>
+                <argument>--proto_path=../../tajo-common/src/main/proto</argument>
+                <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
+                <argument>--java_out=target/generated-sources/proto</argument>
+                <argument>src/main/proto/StorageFragmentProtos.proto</argument>
+              </arguments>
+            </configuration>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.5</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>target/generated-sources/proto</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-jar-plugin</artifactId>
         <executions>
           <execution>

http://git-wip-us.apache.org/repos/asf/tajo/blob/4e2d21c2/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java
new file mode 100644
index 0000000..4008a25
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import org.apache.tajo.storage.fragment.BuiltinFragmentKinds;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.kafka.KafkaFragment.KafkaFragmentKey;
+
+import java.net.URI;
+
+import com.google.common.base.Objects;
+
+/**
+ * Fragment for Kafka
+ */
+public class KafkaFragment extends Fragment<KafkaFragmentKey> {
+  private String topicName;
+  private boolean last;
+
+  public KafkaFragment(URI uri, String tableName, String topicName, long startOffset, long lastOffset,
+      int partitionId, String leaderHost) {
+    super(BuiltinFragmentKinds.KAFKA, uri, tableName, new KafkaFragmentKey(partitionId, startOffset),
+        new KafkaFragmentKey(partitionId, lastOffset), lastOffset - startOffset, new String[] { leaderHost });
+
+    this.topicName = topicName;
+    this.last = false;
+  }
+
+  public KafkaFragment(URI uri, String tableName, String topicName, long startOffset, long lastOffset,
+      int partitionId, String leaderHost, boolean last) {
+    this(uri, tableName, topicName, startOffset, lastOffset, partitionId, leaderHost);
+    this.last = last;
+  }
+
+  public Object clone() throws CloneNotSupportedException {
+    KafkaFragment frag = (KafkaFragment) super.clone();
+    frag.topicName = topicName;
+    frag.last = last;
+    return frag;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof KafkaFragment) {
+      KafkaFragment t = (KafkaFragment) o;
+      if (inputSourceId.equals(t.inputSourceId) && topicName.equals(t.topicName)
+        && getStartKey().equals(t.getStartKey()) && getEndKey().equals(t.getEndKey())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(inputSourceId, topicName, getStartKey(), getEndKey());
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("\"fragment\": {\"topicName\":");
+    builder.append(topicName);
+    builder.append(", \"uri\":");
+    builder.append(uri);
+    builder.append(", \"inputSourceId\":");
+    builder.append(inputSourceId);
+    builder.append(", \"startKey\":");
+    builder.append(startKey);
+    builder.append(", \"endKey\":");
+    builder.append(endKey);
+    builder.append(", \"length\":");
+    builder.append(length);
+    builder.append("}");
+    return builder.toString();
+  }
+
+  public boolean isLast() {
+    return last;
+  }
+
+  public void setLast(boolean last) {
+    this.last = last;
+  }
+
+  public String getTopicName() {
+    return this.topicName;
+  }
+
+  public void setStartKey(int partitionId, long startOffset) {
+    this.startKey = new KafkaFragmentKey(partitionId, startOffset);
+  }
+
+  public void setEndKey(int partitionId, long lastOffset) {
+    this.endKey = new KafkaFragmentKey(partitionId, lastOffset);
+  }
+
+  public static class KafkaFragmentKey implements Comparable<KafkaFragmentKey> {
+    private final int partitionId;
+    private final long offset;
+
+    public KafkaFragmentKey(int partitionId, long offset) {
+      this.partitionId = partitionId;
+      this.offset = offset;
+    }
+
+    public int getPartitionId() {
+      return partitionId;
+    }
+
+    public long getOffset() {
+      return offset;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("{\"partitionId\":");
+      builder.append(partitionId);
+      builder.append(", \"offset\":");
+      builder.append(offset);
+      builder.append("}");
+      return builder.toString();
+    }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (int) (offset ^ (offset >>> 32));
+    result = prime * result + partitionId;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    KafkaFragmentKey other = (KafkaFragmentKey) obj;
+    if (offset != other.offset)
+      return false;
+    if (partitionId != other.partitionId)
+      return false;
+    return true;
+  }
+
+    @Override
+    public int compareTo(KafkaFragmentKey o) {
+      int result = Integer.compare(partitionId, o.partitionId);
+      if (result == 0) {
+        result = Long.compare(offset, o.offset);
+      }
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4e2d21c2/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java
new file mode 100644
index 0000000..2899b7c
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import org.apache.tajo.storage.fragment.FragmentSerde;
+import org.apache.tajo.storage.kafka.StorageFragmentProtos.KafkaFragmentProto;
+
+import java.net.URI;
+
+import com.google.protobuf.GeneratedMessage.Builder;
+
+public class KafkaFragmentSerde implements FragmentSerde<KafkaFragment, KafkaFragmentProto> {
+
+  @Override
+  public Builder newBuilder() {
+    return KafkaFragmentProto.newBuilder();
+  }
+
+  @Override
+  public KafkaFragmentProto serialize(KafkaFragment fragment) {
+    return KafkaFragmentProto.newBuilder()
+        .setUri(fragment.getUri().toASCIIString())
+        .setTableName(fragment.getInputSourceId())
+        .setTopicName(fragment.getTopicName())
+        .setStartOffset(fragment.getStartKey().getOffset())
+        .setLastOffset(fragment.getEndKey().getOffset())
+        .setLast(fragment.isLast())
+        .setLength(fragment.getLength())
+        .setLeaderHost(fragment.getHostNames().get(0))
+        .build();
+  }
+
+  @Override
+  public KafkaFragment deserialize(KafkaFragmentProto proto) {
+    return new KafkaFragment(
+        URI.create(proto.getUri()),
+        proto.getTableName(),
+        proto.getTopicName(),
+        proto.getStartOffset(),
+        proto.getLastOffset(),
+        proto.getPartitionId(),
+        proto.getLeaderHost(),
+        proto.getLast());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4e2d21c2/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java
new file mode 100644
index 0000000..496dfa6
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.EmptyTuple;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.kafka.KafkaFragment.KafkaFragmentKey;
+import org.apache.tajo.storage.text.DelimitedTextFile;
+import org.apache.tajo.storage.text.TextLineDeserializer;
+import org.apache.tajo.storage.text.TextLineParsingError;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class KafkaScanner implements Scanner {
+  private static final Log LOG = LogFactory.getLog(KafkaScanner.class);
+  protected boolean inited = false;
+
+  private int numRows = 0;
+  private int readRecordIndex = -1;
+  private int fragmentSize;
+
+  private long pollTimeout;
+
+  private KafkaFragmentKey startKey;
+  private KafkaFragmentKey endKey;
+  private long currentOffset;
+
+  private SimpleConsumerManager simpleConsumerManager;
+
+  private List<ConsumerRecord<byte[], byte[]>> records = null;
+
+  private Schema schema;
+  private TableMeta meta;
+  private TableStats tableStats;
+  private KafkaFragment fragment;
+  private Column[] targets;
+  private TextLineDeserializer deserializer;
+  private boolean finished;
+
+  private float progress = 0.0f;
+
+  private Tuple outTuple;
+
+  public KafkaScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
+    this.schema = schema;
+    this.meta = meta;
+    this.fragment = (KafkaFragment) fragment;
+    this.tableStats = new TableStats();
+  }
+
+  @Override
+  public void init() throws IOException {
+    inited = true;
+
+    if (targets == null) {
+      targets = schema.toArray();
+    }
+
+    outTuple = new VTuple(targets.length);
+
+    fragmentSize = Integer.parseInt(meta.getProperty(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE,
+        KafkaStorageConstants.DEFAULT_FRAGMENT_SIZE));
+
+    pollTimeout = Long.parseLong(meta.getProperty(KafkaStorageConstants.KAFKA_POLL_TIMEOUT,
+        KafkaStorageConstants.DEFAULT_POLL_TIMEOUT));
+
+    // create deserializer. default is DELIMITER('|') text deserializer.
+    deserializer = DelimitedTextFile.getLineSerde(meta).createDeserializer(schema, meta, targets);
+    deserializer.init();
+
+    simpleConsumerManager = new SimpleConsumerManager(fragment.getUri(), fragment.getTopicName(),
+        fragment.getStartKey().getPartitionId(), fragmentSize);
+
+    initOffset();
+  }
+
+  private void initOffset() {
+    startKey = fragment.getStartKey();
+    endKey = fragment.getEndKey();
+    currentOffset = startKey.getOffset();
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if (finished) {
+      return null;
+    }
+
+    if (records == null || readRecordIndex >= records.size()) {
+      records = readMessage();
+      if (records.isEmpty()) {
+        finished = true;
+        progress = 1.0f;
+        return null;
+      }
+      readRecordIndex = 0;
+    }
+
+    // A messages is fetched data list.
+    // A numRows is current message index in messages.
+    ConsumerRecord<byte[], byte[]> record = records.get(readRecordIndex++);
+    byte[] value = record.value();
+    if (value == null || targets.length == 0) {
+      numRows++;
+      return EmptyTuple.get();
+    }
+
+    ByteBuf buf = Unpooled.wrappedBuffer(value);
+
+    try {
+      deserializer.deserialize(buf, outTuple);
+    } catch (TextLineParsingError tae) {
+      throw new IOException(tae);
+    } finally {
+      numRows++;
+    }
+    return outTuple;
+  }
+
+  /**
+   * Read message from kafka.
+   *
+   * @param messageSize
+   * @return Received records
+   * @throws IOException
+   */
+  private List<ConsumerRecord<byte[], byte[]>> readMessage() throws IOException {
+    List<ConsumerRecord<byte[], byte[]>> receivedRecords = new ArrayList<>();
+    if (currentOffset == endKey.getOffset()) {
+      // If get the last offset, stop to read topic.
+      return receivedRecords;
+    }
+    // Read data until lastOffset of partition of topic.
+    // Read from simpleConsumer.
+    LOG.info("Read the data of " + fragment + ", current offset: " + currentOffset);
+    ConsumerRecords<byte[], byte[]> consumerRecords = simpleConsumerManager.poll(currentOffset, pollTimeout);
+    if (consumerRecords.isEmpty()) {
+      return receivedRecords;
+    }
+
+    long readLastOffset = -1;
+    for (ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
+      readLastOffset = consumerRecord.offset();
+      if (readLastOffset < endKey.getOffset()) {
+        receivedRecords.add(consumerRecord);
+        readLastOffset++; // read a next message.
+      } else {
+        break;
+      }
+    }
+    currentOffset = readLastOffset;
+
+    // read length / total length
+    progress = (currentOffset - startKey.getOffset()) / (endKey.getOffset() - startKey.getOffset());
+    return receivedRecords;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    progress = 0.0f;
+    readRecordIndex = -1;
+    records = null;
+    finished = false;
+    tableStats = new TableStats();
+
+    numRows = 0;
+
+    initOffset();
+  }
+
+  @Override
+  public void close() throws IOException {
+    progress = 1.0f;
+    finished = true;
+
+    if (simpleConsumerManager != null) {
+      simpleConsumerManager.close();
+    }
+    simpleConsumerManager = null;
+  }
+
+  @Override
+  public void pushOperators(LogicalNode planPart) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return true;
+  }
+
+  @Override
+  public void setTarget(Column[] targets) {
+    if (inited) {
+      throw new IllegalStateException("Should be called before init()");
+    }
+    this.targets = targets;
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  @Override
+  public void setFilter(EvalNode filter) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+
+  @Override
+  public void setLimit(long num) {
+  }
+
+  @Override
+  public float getProgress() {
+    return this.progress;
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (tableStats != null) {
+      tableStats.setNumRows(numRows);
+    }
+    return tableStats;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return this.schema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4e2d21c2/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageConstants.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageConstants.java
new file mode 100644
index 0000000..45424d3
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaStorageConstants.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import org.apache.tajo.storage.StorageConstants;
+
+public class KafkaStorageConstants extends StorageConstants {
+  public static final String KAFKA_TOPIC = "kafka.topic";
+  public static final String KAFKA_TOPIC_PARTITION = "kafka.topic.partition";
+  public static final String KAFKA_FRAGMENT_SIZE = "kafka.fragment.size";
+  public static final String KAFKA_POLL_TIMEOUT = "kafka.poll.timeout";
+
+  public static final String DEFAULT_PARTITION = "all";
+  public static final String DEFAULT_FRAGMENT_SIZE = "100000";
+  public static final String DEFAULT_POLL_TIMEOUT = "30000";
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4e2d21c2/tajo-storage/tajo-storage-kafka/src/main/proto/StorageFragmentProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-kafka/src/main/proto/StorageFragmentProtos.proto
new file mode 100644
index 0000000..6888b3a
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/proto/StorageFragmentProtos.proto
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.storage.kafka";
+option java_outer_classname = "StorageFragmentProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message KafkaFragmentProto {
+  required string uri = 1;
+  required string tableName = 2;
+  required string topicName = 3;
+  required int64 startOffset = 4;
+  required int64 lastOffset = 5;
+  required int32 partitionId = 6;
+  required bool last = 7;
+  required int64 length = 8;
+  optional string leaderHost = 9;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4e2d21c2/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaScanner.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaScanner.java
new file mode 100644
index 0000000..50911be
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaScanner.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import static org.apache.tajo.storage.kafka.KafkaTestUtil.TOPIC_NAME;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.tajo.BuiltinStorages;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaBuilder;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.kafka.server.EmbeddedKafka;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URI;
+
+public class TestKafkaScanner {
+  private static EmbeddedKafka KAFKA;
+  private static Schema TABLE_SCHEMA;
+  private static URI KAFKA_SERVER_URI;
+
+  static {
+    TABLE_SCHEMA = SchemaBuilder.builder()
+        .add("col1", Type.INT4)
+        .add("col2", Type.TEXT)
+        .add("col3", Type.FLOAT4)
+        .build();
+  }
+
+  /**
+   * Start up EmbeddedKafka and Generate test data.
+   *
+   * @throws Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    KAFKA = EmbeddedKafka.createEmbeddedKafka(2181, 9092);
+    KAFKA.start();
+    KAFKA.createTopic(1, 1, TOPIC_NAME);
+    KAFKA_SERVER_URI = URI.create("kafka://" + KAFKA.getConnectString());
+
+    // Load test data.
+    try (Producer<String, String> producer = KAFKA.createProducer(KAFKA.getConnectString())) {
+      KafkaTestUtil.sendTestData(producer, TOPIC_NAME);
+    }
+  }
+
+  /**
+   * Close EmbeddedKafka.
+   *
+   * @throws Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    KAFKA.close();
+  }
+
+  /**
+   * Test for readMessage.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testScanner() throws Exception {
+    KafkaFragment fragment = new KafkaFragment(KAFKA_SERVER_URI, TOPIC_NAME, TOPIC_NAME, 0, 1, 0, "localhost");
+    TajoConf conf = new TajoConf();
+    TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.KAFKA, new TajoConf());
+    KafkaScanner scanner = new KafkaScanner(conf, TABLE_SCHEMA, meta, fragment);
+    scanner.init();
+    Tuple tuple = scanner.next();
+    assertTrue(tuple.getInt4(0) == 1);
+    assertTrue(tuple.getText(1).equals("abc"));
+    assertTrue(tuple.getFloat4(2) == 0.2f);
+    scanner.close();
+  }
+}