You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/07/03 20:45:26 UTC

drill git commit: DRILL-5432: Added pcap-format support as format plugin

Repository: drill
Updated Branches:
  refs/heads/master 63e243378 -> f5b975adf


DRILL-5432: Added pcap-format support as format plugin

closes #831


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f5b975ad
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f5b975ad
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f5b975ad

Branch: refs/heads/master
Commit: f5b975adf26622451885149acf26ce4c2c2e0a93
Parents: 63e2433
Author: Vlad Storona <vs...@cybervisiontech.com>
Authored: Thu May 11 13:53:08 2017 +0000
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Jul 3 13:00:50 2017 -0700

----------------------------------------------------------------------
 .../drill/exec/store/pcap/PcapDrillTable.java   |  73 ++++
 .../drill/exec/store/pcap/PcapFormatConfig.java |  24 ++
 .../drill/exec/store/pcap/PcapFormatPlugin.java | 116 ++++++
 .../drill/exec/store/pcap/PcapFormatUtils.java  |  79 ++++
 .../drill/exec/store/pcap/PcapRecordReader.java | 307 +++++++++++++++
 .../exec/store/pcap/decoder/Murmur128.java      | 161 ++++++++
 .../drill/exec/store/pcap/decoder/Packet.java   | 375 +++++++++++++++++++
 .../store/pcap/decoder/PacketConstants.java     |  68 ++++
 .../exec/store/pcap/decoder/PacketDecoder.java  | 116 ++++++
 .../drill/exec/store/pcap/dto/ColumnDto.java    |  63 ++++
 .../drill/exec/store/pcap/package-info.java     |  22 ++
 .../drill/exec/store/pcap/schema/PcapTypes.java |  25 ++
 .../drill/exec/store/pcap/schema/Schema.java    |  64 ++++
 .../resources/bootstrap-storage-plugins.json    |   3 +
 .../dfs/TestFormatPluginOptionExtractor.java    |   1 +
 .../drill/exec/store/pcap/ConcatPcap.java       |  67 ++++
 .../drill/exec/store/pcap/TestPcapDecoder.java  | 239 ++++++++++++
 .../exec/store/pcap/TestPcapRecordReader.java   |  65 ++++
 .../src/test/resources/store/pcap/data-1.pcap   | Bin 0 -> 73016 bytes
 .../src/test/resources/store/pcap/data-2.pcap   | Bin 0 -> 1167896 bytes
 .../src/test/resources/store/pcap/tcp-1.pcap    | Bin 0 -> 1848 bytes
 .../src/test/resources/store/pcap/tcp-2.pcap    | Bin 0 -> 29208 bytes
 .../apache/drill/exec/proto/UserBitShared.java  |   9 +
 .../exec/proto/beans/CoreOperatorType.java      |   4 +-
 24 files changed, 1880 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
new file mode 100644
index 0000000..2fbf67d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapDrillTable.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.pcap.dto.ColumnDto;
+import org.apache.drill.exec.store.pcap.schema.PcapTypes;
+import org.apache.drill.exec.store.pcap.schema.Schema;
+
+import java.util.List;
+
+public class PcapDrillTable extends DrillTable {
+
+  private final Schema schema;
+
+
+  public PcapDrillTable(String storageEngineName, FileSystemPlugin plugin, String userName, FormatSelection selection) {
+    super(storageEngineName, plugin, userName, selection);
+    this.schema = new Schema();
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    List<RelDataType> typeList = Lists.newArrayList();
+    List<String> fieldNameList = Lists.newArrayList();
+    convertToRelDataType(typeFactory, fieldNameList, typeList);
+    return typeFactory.createStructType(typeList, fieldNameList);
+  }
+
+  private RelDataType getSqlTypeFromPcapType(RelDataTypeFactory typeFactory, PcapTypes type) {
+    switch (type) {
+      case LONG:
+        return typeFactory.createSqlType(SqlTypeName.BIGINT);
+      case INTEGER:
+        return typeFactory.createSqlType(SqlTypeName.INTEGER);
+      case STRING:
+        return typeFactory.createSqlType(SqlTypeName.VARCHAR);
+      case TIMESTAMP:
+        return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+      default:
+        throw new UnsupportedOperationException("Unsupported type.");
+    }
+  }
+
+  private void convertToRelDataType(RelDataTypeFactory typeFactory, List<String> names, List<RelDataType> types) {
+    for (ColumnDto column : schema.getColumns()) {
+      names.add(column.getColumnName());
+      RelDataType type = getSqlTypeFromPcapType(typeFactory, column.getColumnType());
+      type = typeFactory.createTypeWithNullability(type, column.isNullable());
+      types.add(type);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
new file mode 100644
index 0000000..4e44839
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+@JsonTypeName("pcap")
+public class PcapFormatConfig implements FormatPluginConfig {
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
new file mode 100644
index 0000000..a903705
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.MagicString;
+import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> {
+
+  private final PcapFormatMatcher matcher;
+
+  public PcapFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+                          StoragePluginConfig storagePluginConfig) {
+    this(name, context, fsConf, storagePluginConfig, new PcapFormatConfig());
+  }
+
+  public PcapFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, PcapFormatConfig formatPluginConfig) {
+    super(name, context, fsConf, config, formatPluginConfig, true, false, true, false, Lists.newArrayList("pcap"), "pcap");
+    this.matcher = new PcapFormatMatcher(this);
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+    return true;
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+    String path = dfs.makeQualified(new Path(fileWork.getPath())).toUri().getPath();
+    return new PcapRecordReader(path, columns);
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
+    throw new UnsupportedOperationException("unimplemented");
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+    return UserBitShared.CoreOperatorType.PCAP_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+    return 0;
+  }
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return this.matcher;
+  }
+
+  private static class PcapFormatMatcher extends BasicFormatMatcher {
+
+    public PcapFormatMatcher(PcapFormatPlugin plugin) {
+      super(plugin, ImmutableList.of(Pattern.compile(".*\\.pcap$")), ImmutableList.<MagicString>of());
+    }
+
+    @Override
+    public DrillTable isReadable(DrillFileSystem fs,
+                                 FileSelection selection, FileSystemPlugin fsPlugin,
+                                 String storageEngineName, String userName) throws IOException {
+      if (isFileReadable(fs, selection.getFirstPath(fs))) {
+        if (plugin.getName() != null) {
+          NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
+          namedConfig.name = plugin.getName();
+          return new PcapDrillTable(storageEngineName, fsPlugin, userName, new FormatSelection(namedConfig, selection));
+        } else {
+          return new PcapDrillTable(storageEngineName, fsPlugin, userName, new FormatSelection(plugin.getConfig(), selection));
+        }
+      }
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
new file mode 100644
index 0000000..bba08be
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
+
+public class PcapFormatUtils {
+
+  /**
+   *
+   * @param byteOrder true for forward file order, false fore revers file order
+   * @param buf byte buffer
+   * @param offset buffer offset
+   * @return integer value of specific bytes from buffer
+   */
+  public static int getIntFileOrder(boolean byteOrder, final byte[] buf, final int offset) {
+    if (byteOrder) {
+      return Ints.fromBytes(buf[offset], buf[offset + 1], buf[offset + 2], buf[offset + 3]);
+    } else {
+      return Ints.fromBytes(buf[offset + 3], buf[offset + 2], buf[offset + 1], buf[offset]);
+    }
+  }
+
+  /**
+   *
+   * @param byteOrder true for forward file order, false fore revers file order
+   * @param buf byte buffer
+   * @param offset buffer offset
+   * @return short value as int of specific bytes from buffer
+   */
+  public static int getShortFileOrder(boolean byteOrder, final byte[] buf, final int offset) {
+    if (byteOrder) {
+      return Shorts.fromBytes(buf[offset], buf[offset + 1]);
+    } else {
+      return Shorts.fromBytes(buf[offset + 1], buf[offset]);
+    }
+  }
+
+  public static int getInt(final byte[] buf, final int offset) {
+    return Ints.fromBytes(buf[offset], buf[offset + 1], buf[offset + 2], buf[offset + 3]);
+  }
+
+  public static int getShort(final byte[] buf, final int offset) {
+    return 0xffff & Shorts.fromBytes(buf[offset], buf[offset + 1]);
+  }
+
+  public static int getByte(final byte[] buf, final int offset) {
+    return 0xff & buf[offset];
+  }
+
+  public static int convertShort(final byte[] data, int offset) {
+    return ((data[offset] & 0xFF) << 8) | (data[offset + 1] & 0xFF);
+  }
+
+  public static int convertInt(final byte[] data, int offset) {
+    return ((data[offset] & 0xFF) << 24) | ((data[offset + 1] & 0xFF) << 16) |
+        ((data[offset + 2] & 0xFF) << 8) | (data[offset + 3] & 0xFF);
+  }
+
+  public static String parseBytesToASCII(byte[] data) {
+    return new String(data).trim()
+        .replaceAll("\\P{Print}", ".");
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
new file mode 100644
index 0000000..36c3646
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.pcap.decoder.Packet;
+import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
+import org.apache.drill.exec.store.pcap.dto.ColumnDto;
+import org.apache.drill.exec.store.pcap.schema.PcapTypes;
+import org.apache.drill.exec.store.pcap.schema.Schema;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
+
+public class PcapRecordReader extends AbstractRecordReader {
+  private static final Logger logger = LoggerFactory.getLogger(PcapRecordReader.class);
+
+  private static final int BATCH_SIZE = 40_000;
+
+  private OutputMutator output;
+
+  private PacketDecoder decoder;
+  private ImmutableList<ProjectedColumnInfo> projectedCols;
+
+  private byte[] buffer;
+  private int offset = 0;
+  private InputStream in;
+  private int validBytes;
+
+  private String inputPath;
+  private List<SchemaPath> projectedColumns;
+
+  private static final Map<PcapTypes, MinorType> TYPES;
+
+  private static class ProjectedColumnInfo {
+    ValueVector vv;
+    ColumnDto pcapColumn;
+  }
+
+  static {
+    TYPES = ImmutableMap.<PcapTypes, TypeProtos.MinorType>builder()
+        .put(PcapTypes.STRING, MinorType.VARCHAR)
+        .put(PcapTypes.INTEGER, MinorType.INT)
+        .put(PcapTypes.LONG, MinorType.BIGINT)
+        .put(PcapTypes.TIMESTAMP, MinorType.TIMESTAMP)
+        .build();
+  }
+
+  public PcapRecordReader(final String inputPath,
+                          final List<SchemaPath> projectedColumns) {
+    this.inputPath = inputPath;
+    this.projectedColumns = projectedColumns;
+  }
+
+  @Override
+  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+    try {
+
+      this.output = output;
+      this.buffer = new byte[100000];
+      this.in = new FileInputStream(inputPath);
+      this.decoder = new PacketDecoder(in);
+      this.validBytes = in.read(buffer);
+      this.projectedCols = getProjectedColsIfItNull();
+      setColumns(projectedColumns);
+    } catch (IOException io) {
+      throw UserException.dataReadError(io)
+          .addContext("File name:", inputPath)
+          .build(logger);
+    }
+  }
+
+  @Override
+  public int next() {
+    try {
+      return parsePcapFilesAndPutItToTable();
+    } catch (IOException io) {
+      throw UserException.dataReadError(io)
+          .addContext("Trouble with reading packets in file!")
+          .build(logger);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+//    buffer = null;
+//    in.close();
+  }
+
+  private ImmutableList<ProjectedColumnInfo> getProjectedColsIfItNull() {
+    return projectedCols != null ? projectedCols : initCols(new Schema());
+  }
+
+  private ImmutableList<ProjectedColumnInfo> initCols(final Schema schema) {
+    ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
+    ColumnDto column;
+
+    for (int i = 0; i < schema.getNumberOfColumns(); i++) {
+      column = schema.getColumnByIndex(i);
+
+      final String name = column.getColumnName().toLowerCase();
+      final PcapTypes type = column.getColumnType();
+      TypeProtos.MinorType minorType = TYPES.get(type);
+
+      ProjectedColumnInfo pci = getProjectedColumnInfo(column, name, minorType);
+      pciBuilder.add(pci);
+    }
+    return pciBuilder.build();
+  }
+
+  private ProjectedColumnInfo getProjectedColumnInfo(final ColumnDto column,
+                                                     final String name,
+                                                     final MinorType minorType) {
+    TypeProtos.MajorType majorType = getMajorType(minorType);
+
+    MaterializedField field =
+        MaterializedField.create(name, majorType);
+
+    ValueVector vector =
+        getValueVector(minorType, majorType, field);
+
+    return getProjectedColumnInfo(column, vector);
+  }
+
+  private ProjectedColumnInfo getProjectedColumnInfo(final ColumnDto column, final ValueVector vector) {
+    ProjectedColumnInfo pci = new ProjectedColumnInfo();
+    pci.vv = vector;
+    pci.pcapColumn = column;
+    return pci;
+  }
+
+  private MajorType getMajorType(final MinorType minorType) {
+    return Types.optional(minorType);
+  }
+
+  private ValueVector getValueVector(final MinorType minorType,
+                                     final MajorType majorType,
+                                     final MaterializedField field) {
+    try {
+
+      final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
+          minorType, majorType.getMode());
+      ValueVector vector = output.addField(field, clazz);
+      vector.allocateNew();
+      return vector;
+
+    } catch (SchemaChangeException sce) {
+      throw new IllegalStateException("The addition of this field is incompatible with this OutputMutator's capabilities");
+    }
+  }
+
+  private int parsePcapFilesAndPutItToTable() throws IOException {
+    Packet packet = new Packet();
+    int counter = 0;
+    while (offset < validBytes && counter != BATCH_SIZE) {
+
+      if (validBytes - offset < 9000) {
+        System.arraycopy(buffer, offset, buffer, 0, validBytes - offset);
+        validBytes = validBytes - offset;
+        offset = 0;
+
+        int n = in.read(buffer, validBytes, buffer.length - validBytes);
+        if (n > 0) {
+          validBytes += n;
+        }
+      }
+
+      offset = decoder.decodePacket(buffer, offset, packet);
+
+      if (addDataToTable(packet, decoder.getNetwork(), counter)) {
+        counter++;
+      }
+    }
+    return counter;
+  }
+
+  private boolean addDataToTable(final Packet packet, final int networkType, final int count) {
+    for (ProjectedColumnInfo pci : projectedCols) {
+      switch (pci.pcapColumn.getColumnName()) {
+        case "type":
+          setStringColumnValue(packet.getPacketType(), pci, count);
+          break;
+        case "timestamp":
+          setTimestampColumnValue(packet.getTimestamp(), pci, count);
+          break;
+        case "network":
+          setIntegerColumnValue(networkType, pci, count);
+          break;
+        case "src_mac_address":
+          setStringColumnValue(packet.getEthernetSource(), pci, count);
+          break;
+        case "dst_mac_address":
+          setStringColumnValue(packet.getEthernetDestination(), pci, count);
+          break;
+        case "dst_ip":
+          if (packet.getDst_ip() != null) {
+            setStringColumnValue(packet.getDst_ip().getHostAddress(), pci, count);
+          } else {
+            setStringColumnValue(null, pci, count);
+          }
+          break;
+        case "src_ip":
+          if (packet.getSrc_ip() != null) {
+            setStringColumnValue(packet.getSrc_ip().getHostAddress(), pci, count);
+          } else {
+            setStringColumnValue(null, pci, count);
+          }
+          break;
+        case "src_port":
+          setIntegerColumnValue(packet.getSrc_port(), pci, count);
+          break;
+        case "dst_port":
+          setIntegerColumnValue(packet.getDst_port(), pci, count);
+          break;
+        case "tcp_session":
+          if (packet.isTcpPacket()) {
+            setLongColumnValue(packet.getSessionHash(), pci, count);
+          }
+          break;
+        case "tcp_sequence":
+          if (packet.isTcpPacket()) {
+            setIntegerColumnValue(packet.getSequenceNumber(), pci, count);
+          }
+          break;
+        case "packet_length":
+          setIntegerColumnValue(packet.getPacketLength(), pci, count);
+          break;
+        case "data":
+          if (packet.getData() != null) {
+            setStringColumnValue(parseBytesToASCII(packet.getData()), pci, count);
+          } else {
+            setStringColumnValue("[]", pci, count);
+          }
+          break;
+      }
+    }
+    return true;
+  }
+
+  private void setLongColumnValue(long data, ProjectedColumnInfo pci, final int count) {
+    ((NullableBigIntVector.Mutator) pci.vv.getMutator())
+        .setSafe(count, data);
+  }
+
+  private void setIntegerColumnValue(final int data, final ProjectedColumnInfo pci, final int count) {
+    ((NullableIntVector.Mutator) pci.vv.getMutator())
+        .setSafe(count, data);
+  }
+
+  private void setTimestampColumnValue(final long data, final ProjectedColumnInfo pci, final int count) {
+    ((NullableTimeStampVector.Mutator) pci.vv.getMutator())
+        .setSafe(count, data);
+  }
+
+  private void setStringColumnValue(final String data, final ProjectedColumnInfo pci, final int count) {
+    if (data == null) {
+      ((NullableVarCharVector.Mutator) pci.vv.getMutator())
+          .setNull(count);
+    } else {
+      ByteBuffer value = ByteBuffer.wrap(data.getBytes(UTF_8));
+      ((NullableVarCharVector.Mutator) pci.vv.getMutator())
+          .setSafe(count, value, 0, value.remaining());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java
new file mode 100644
index 0000000..3ef0c21
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap.decoder;
+
+/**
+ * Simple port of MurmurHash with some state management.
+ *
+ * Drill's internal hashing isn't useful here because it only deals with off-heap memory.
+ */
+public class Murmur128 {
+  private long h1;
+  private long h2;
+  private byte[] buf = new byte[8];
+
+  public Murmur128(long h1, long h2) {
+    this.h1 = h1;
+    this.h2 = h2;
+  }
+
+  private long getLongLittleEndian(byte[] buffer, int offset) {
+    return ((long) buffer[offset + 7] << 56)
+        | ((buffer[offset + 6] & 0xffL) << 48)
+        | ((buffer[offset + 5] & 0xffL) << 40)
+        | ((buffer[offset + 4] & 0xffL) << 32)
+        | ((buffer[offset + 3] & 0xffL) << 24)
+        | ((buffer[offset + 2] & 0xffL) << 16)
+        | ((buffer[offset + 1] & 0xffL) << 8)
+        | ((buffer[offset] & 0xffL));
+  }
+
+  private static long fmix64(long k) {
+    k ^= k >>> 33;
+    k *= 0xff51afd7ed558ccdL;
+    k ^= k >>> 33;
+    k *= 0xc4ceb9fe1a85ec53L;
+    k ^= k >>> 33;
+    return k;
+  }
+
+  public void hash(int x) {
+    buf[0] = (byte) (x & 0xffL);
+    buf[1] = (byte) ((x >> 8) & 0xffL);
+    buf[2] = (byte) ((x >> 16) & 0xffL);
+    buf[3] = (byte) ((x >> 24) & 0xffL);
+    hash(buf, 0, 4);
+  }
+
+  public void hash(long x) {
+    buf[0] = (byte) (x & 0xffL);
+    buf[1] = (byte) ((x >> 8) & 0xffL);
+    buf[2] = (byte) ((x >> 16) & 0xffL);
+    buf[3] = (byte) ((x >> 24) & 0xffL);
+    buf[4] = (byte) ((x >> 32) & 0xffL);
+    buf[5] = (byte) ((x >> 40) & 0xffL);
+    buf[6] = (byte) ((x >> 48) & 0xffL);
+    buf[7] = (byte) ((x >> 56) & 0xffL);
+    hash(buf, 0, 8);
+  }
+
+  public void hash(byte[] buffer, int start, int end) {
+    final long c1 = 0x87c37b91114253d5L;
+    final long c2 = 0x4cf5ad432745937fL;
+    int length = end - start;
+    int roundedEnd = start + (length & 0xFFFFFFF0);  // round down to 16 byte block
+    for (int i = start; i < roundedEnd; i += 16) {
+      long k1 = getLongLittleEndian(buffer, i);
+      long k2 = getLongLittleEndian(buffer, i + 8);
+      k1 *= c1;
+      k1 = Long.rotateLeft(k1, 31);
+      k1 *= c2;
+      h1 ^= k1;
+      h1 = Long.rotateLeft(h1, 27);
+      h1 += h2;
+      h1 = h1 * 5 + 0x52dce729;
+      k2 *= c2;
+      k2 = Long.rotateLeft(k2, 33);
+      k2 *= c1;
+      h2 ^= k2;
+      h2 = Long.rotateLeft(h2, 31);
+      h2 += h1;
+      h2 = h2 * 5 + 0x38495ab5;
+    }
+
+    long k1 = 0;
+    long k2 = 0;
+
+    // tail
+    switch (length & 15) {
+      case 15:
+        k2 = (buffer[roundedEnd + 14] & 0xffL) << 48;
+      case 14:
+        k2 ^= (buffer[roundedEnd + 13] & 0xffL) << 40;
+      case 13:
+        k2 ^= (buffer[roundedEnd + 12] & 0xffL) << 32;
+      case 12:
+        k2 ^= (buffer[roundedEnd + 11] & 0xffL) << 24;
+      case 11:
+        k2 ^= (buffer[roundedEnd + 10] & 0xffL) << 16;
+      case 10:
+        k2 ^= (buffer[roundedEnd + 9] & 0xffL) << 8;
+      case 9:
+        k2 ^= (buffer[roundedEnd + 8] & 0xffL);
+        k2 *= c2;
+        k2 = Long.rotateLeft(k2, 33);
+        k2 *= c1;
+        h2 ^= k2;
+      case 8:
+        k1 = (long) buffer[roundedEnd + 7] << 56;
+      case 7:
+        k1 ^= (buffer[roundedEnd + 6] & 0xffL) << 48;
+      case 6:
+        k1 ^= (buffer[roundedEnd + 5] & 0xffL) << 40;
+      case 5:
+        k1 ^= (buffer[roundedEnd + 4] & 0xffL) << 32;
+      case 4:
+        k1 ^= (buffer[roundedEnd + 3] & 0xffL) << 24;
+      case 3:
+        k1 ^= (buffer[roundedEnd + 2] & 0xffL) << 16;
+      case 2:
+        k1 ^= (buffer[roundedEnd + 1] & 0xffL) << 8;
+      case 1:
+        k1 ^= (buffer[roundedEnd] & 0xffL);
+        k1 *= c1;
+        k1 = Long.rotateLeft(k1, 31);
+        k1 *= c2;
+        h1 ^= k1;
+    }
+
+    h1 ^= length;
+    h2 ^= length;
+
+    h1 += h2;
+    h2 += h1;
+
+    h1 = fmix64(h1);
+    h2 = fmix64(h2);
+
+    h1 += h2;
+    h2 += h1;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public long digest64() {
+    return h1 ^ h2;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
new file mode 100644
index 0000000..0a45290
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap.decoder;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.convertInt;
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.convertShort;
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getByte;
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getIntFileOrder;
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getShort;
+
+public class Packet {
+  // pcap header
+  //        typedef struct pcaprec_hdr_s {
+  //            guint32 ts_sec;         // timestamp seconds
+  //            guint32 ts_usec;        // timestamp microseconds */
+  //            guint32 incl_len;       // number of octets of packet saved in file */
+  //            guint32 orig_len;       // actual length of packet */
+  //        } pcaprec_hdr_t;
+  private long timestamp;
+  private int originalLength;
+
+  private byte[] raw;
+
+  private int etherOffset;
+  private int ipOffset;
+
+  private int packetLength;
+  private int etherProtocol;
+  private int protocol;
+
+  private boolean isRoutingV6;
+
+  @SuppressWarnings("WeakerAccess")
+  public boolean readPcap(final InputStream in, final boolean byteOrder, final int maxLength) throws IOException {
+    byte[] pcapHeader = new byte[PacketConstants.PCAP_HEADER_SIZE];
+    int n = in.read(pcapHeader);
+    if (n < pcapHeader.length) {
+      return false;
+    }
+    decodePcapHeader(pcapHeader, byteOrder, maxLength, 0);
+
+    raw = new byte[originalLength];
+    n = in.read(raw);
+    if (n < 0) {
+      return false;
+    }
+    etherOffset = 0;
+
+    decodeEtherPacket();
+    return true;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public int decodePcap(final byte[] buffer, final int offset, final boolean byteOrder, final int maxLength) {
+    raw = buffer;
+    etherOffset = offset + PacketConstants.PCAP_HEADER_SIZE;
+    decodePcapHeader(raw, byteOrder, maxLength, offset);
+    decodeEtherPacket();
+    return offset + PacketConstants.PCAP_HEADER_SIZE + originalLength;
+  }
+
+  public String getPacketType() {
+    if (isTcpPacket()) {
+      return "TCP";
+    } else if (isUdpPacket()) {
+      return "UDP";
+    } else if (isArpPacket()) {
+      return "ARP";
+    } else if (isIcmpPacket()) {
+      return "ICMP";
+    } else {
+      return "unknown";
+    }
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public boolean isIpV4Packet() {
+    return etherProtocol == PacketConstants.IPv4_TYPE;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public boolean isIpV6Packet() {
+    return etherProtocol == PacketConstants.IPv6_TYPE;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public boolean isPPPoV6Packet() {
+    return etherProtocol == PacketConstants.PPPoV6_TYPE;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public boolean isTcpPacket() {
+    return protocol == PacketConstants.TCP_PROTOCOL;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public boolean isUdpPacket() {
+    return protocol == PacketConstants.UDP_PROTOCOL;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public boolean isArpPacket() {
+    return protocol == PacketConstants.ARP_PROTOCOL;
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public boolean isIcmpPacket() {
+    return protocol == PacketConstants.ICMP_PROTOCOL;
+  }
+
+  public long getSessionHash() {
+    if (isTcpPacket()) {
+      Murmur128 h1 = new Murmur128(1, 2);
+      byte[] buf = getIpAddressBytes(true);
+      if (buf == null) {
+        return 0;
+      }
+      h1.hash(buf, 0, buf.length);
+      h1.hash(getSrc_port());
+
+      Murmur128 h2 = new Murmur128(1, 2);
+      buf = getIpAddressBytes(false);
+      if (buf == null) {
+        return 0;
+      }
+      h2.hash(buf, 0, buf.length);
+      h2.hash(getDst_port());
+
+      return h1.digest64() ^ h2.digest64();
+    } else {
+      return 0;
+    }
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public int getPacketLength() {
+    return packetLength;
+  }
+
+  public InetAddress getSrc_ip() {
+    return getIPAddress(true);
+  }
+
+  public InetAddress getDst_ip() {
+    return getIPAddress(false);
+  }
+
+  public String getEthernetSource() {
+    return getEthernetAddress(PacketConstants.ETHER_SRC_OFFSET);
+  }
+
+  public String getEthernetDestination() {
+    return getEthernetAddress(PacketConstants.ETHER_DST_OFFSET);
+  }
+
+  public int getSequenceNumber() {
+    if (isTcpPacket()) {
+      int sequenceOffset = PacketConstants.ETHER_HEADER_LENGTH + getIPHeaderLength() + getTCPHeaderLength(raw) + 4;
+      return Math.abs(convertInt(raw, sequenceOffset));
+    } else {
+      return 0;
+    }
+  }
+
+  public int getSrc_port() {
+    if (isPPPoV6Packet()) {
+      return getPort(64);
+    }
+    if (isIpV6Packet()) {
+      if (isRoutingV6) {
+        return getPort(136);
+      }
+      return getPort(40);
+    }
+    return getPort(0);
+  }
+
+  public int getDst_port() {
+    if (isPPPoV6Packet()) {
+      return getPort(66);
+    }
+    if (isIpV6Packet()) {
+      if (isRoutingV6) {
+        return getPort(138);
+      }
+      return getPort(42);
+    }
+    return getPort(2);
+  }
+
+  public byte[] getData() {
+    int payloadDataStart = getIPHeaderLength();
+    if (isTcpPacket()) {
+      payloadDataStart += this.getTCPHeaderLength(raw);
+    } else if (isUdpPacket()) {
+      payloadDataStart += this.getUDPHeaderLength();
+    } else {
+      return null;
+    }
+    byte[] data = null;
+    if (packetLength >= payloadDataStart) {
+      data = new byte[packetLength - payloadDataStart];
+      System.arraycopy(raw, ipOffset + payloadDataStart, data, 0, data.length);
+    }
+    return data;
+  }
+
+  private InetAddress getIPAddress(final boolean src) {
+    byte[] ipBuffer = getIpAddressBytes(src);
+    if (ipBuffer == null) {
+      return null;
+    }
+    try {
+      return InetAddress.getByAddress(ipBuffer);
+    } catch (UnknownHostException e) {
+      return null;
+    }
+  }
+
+  private byte[] getIpAddressBytes(final boolean src) {
+    int srcPos;
+    byte[] ipBuffer;
+    int byteShift = 0;
+    if (isIpV4Packet()) {
+      ipBuffer = new byte[4];
+      srcPos = src ? PacketConstants.IP4_SRC_OFFSET : PacketConstants.IP4_DST_OFFSET;
+    } else if (isIpV6Packet()) {
+      ipBuffer = new byte[16];
+      if (isRoutingV6) {
+        byteShift = 96;
+      }
+      srcPos = src ? PacketConstants.IP6_SRC_OFFSET + byteShift : PacketConstants.IP6_DST_OFFSET + byteShift;
+    } else if (isPPPoV6Packet()) {
+      ipBuffer = new byte[16];
+      srcPos = src ? PacketConstants.IP6_SRC_OFFSET + PacketConstants.PPPoV6_IP_OFFSET : PacketConstants.IP6_DST_OFFSET + PacketConstants.PPPoV6_IP_OFFSET;
+    } else {
+      return null;
+    }
+    System.arraycopy(raw, etherOffset + srcPos, ipBuffer, 0, ipBuffer.length);
+    return ipBuffer;
+  }
+
+  private int getIPHeaderLength() {
+    return (raw[etherOffset + PacketConstants.VER_IHL_OFFSET] & 0xF) * 4;
+  }
+
+  private int getTCPHeaderLength(final byte[] packet) {
+    final int inTCPHeaderDataOffset = 12;
+
+    // tcp packet header can have options
+    int dataOffset = etherOffset + getIPHeaderLength() + inTCPHeaderDataOffset;
+    return 20 + ((packet[dataOffset] >> 4) & 0xF) * 4;
+  }
+
+  private int getUDPHeaderLength() {
+    return 8;
+  }
+
+  private int ipV4HeaderLength() {
+    return (getByte(raw, ipOffset) & 0xf) * 4;
+  }
+
+  private int ipVersion() {
+    return getByte(raw, ipOffset) >>> 4;
+  }
+
+  private void decodePcapHeader(final byte[] header, final boolean byteOrder, final int maxLength, final int offset) {
+    timestamp = getTimestamp(header, byteOrder, offset);
+    originalLength = getIntFileOrder(byteOrder, header, offset + PacketConstants.ORIGINAL_LENGTH_OFFSET);
+    packetLength = getIntFileOrder(byteOrder, header, offset + PacketConstants.ACTUAL_LENGTH_OFFSET);
+    Preconditions.checkState(originalLength < maxLength,
+        "Packet too long (%d bytes)", originalLength);
+  }
+
+  private long getTimestamp(final byte[] header, final boolean byteOrder, final int offset) {
+    return getIntFileOrder(byteOrder, header, offset + PacketConstants.TIMESTAMP_OFFSET) * 1000L +
+        getIntFileOrder(byteOrder, header, offset + PacketConstants.TIMESTAMP_MICRO_OFFSET) / 1000L;
+  }
+
+  private void decodeEtherPacket() {
+    etherProtocol = getShort(raw, etherOffset + PacketConstants.PACKET_PROTOCOL_OFFSET);
+    ipOffset = etherOffset + PacketConstants.IP_OFFSET;
+    if (isIpV4Packet()) {
+      protocol = processIpV4Packet();
+    } else if (isIpV6Packet()) {
+      protocol = processIpV6Packet();
+    } else if (isPPPoV6Packet()) {
+      protocol = getByte(raw, etherOffset + 48);
+    }
+    // everything is decoded lazily
+  }
+
+  private int processIpV4Packet() {
+    validateIpV4Packet();
+    return getByte(raw, ipOffset + 9);
+  }
+
+  private int processIpV6Packet() {
+    Preconditions.checkState(ipVersion() == 6, "Should have seen IP version 6, got %d", ipVersion());
+    int headerLength = 40;
+    int nextHeader = raw[ipOffset + 6] & 0xff;
+    while (nextHeader != PacketConstants.TCP_PROTOCOL && nextHeader != PacketConstants.UDP_PROTOCOL && nextHeader != PacketConstants.NO_NEXT_HEADER) {
+      switch (nextHeader) {
+        case PacketConstants.FRAGMENT_V6:
+          nextHeader = getByte(raw, ipOffset + headerLength);
+          headerLength += 8;
+          break;
+        case PacketConstants.ROUTING_V6:
+          isRoutingV6 = true;
+          nextHeader = getByte(raw, ipOffset + headerLength + 15);
+          headerLength += (getByte(raw, ipOffset + headerLength) + 1) * 8;
+          break;
+        case PacketConstants.HOP_BY_HOP_EXTENSION_V6:
+        case PacketConstants.DESTINATION_OPTIONS_V6:
+        case PacketConstants.AUTHENTICATION_V6:
+        case PacketConstants.ENCAPSULATING_SECURITY_V6:
+        case PacketConstants.MOBILITY_EXTENSION_V6:
+          nextHeader = getByte(raw, ipOffset + headerLength);
+          headerLength += (getByte(raw, ipOffset + headerLength) + 1) * 8;
+          break;
+        default:
+          //noinspection ConstantConditions
+          Preconditions.checkState(false, "Unknown V6 extension or protocol: ", nextHeader);
+          return getByte(raw, ipOffset + headerLength);
+      }
+    }
+    return nextHeader;
+  }
+
+  private void validateIpV4Packet() {
+    Preconditions.checkState(ipVersion() == 4, "Should have seen IP version 4, got %d", ipVersion());
+    int n = ipV4HeaderLength();
+    Preconditions.checkState(n >= 20 && n < 200, "Invalid header length: ", n);
+  }
+
+  private String getEthernetAddress(int offset) {
+    byte[] r = new byte[6];
+    System.arraycopy(raw, etherOffset + offset, r, 0, 6);
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < r.length; i++) {
+      sb.append(String.format("%02X%s", r[i], (i < r.length - 1) ? ":" : ""));
+    }
+    return sb.toString();
+  }
+
+  private int getPort(int offset) {
+    int dstPortOffset = ipOffset + getIPHeaderLength() + offset;
+    return convertShort(raw, dstPortOffset);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
new file mode 100644
index 0000000..003f87e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap.decoder;
+
+@SuppressWarnings("WeakerAccess")
+public final class PacketConstants {
+
+  public static final int PCAP_HEADER_SIZE = 4 * 4;
+
+  public static final int TIMESTAMP_OFFSET = 0;
+  public static final int TIMESTAMP_MICRO_OFFSET = 4;
+  public static final int ORIGINAL_LENGTH_OFFSET = 8;
+  public static final int ACTUAL_LENGTH_OFFSET = 12;
+
+  public static final int PACKET_PROTOCOL_OFFSET = 12;
+
+  public static final byte ARP_PROTOCOL = 0;
+  public static final byte ICMP_PROTOCOL = 1;
+  public static final byte TCP_PROTOCOL = 6;
+  public static final byte UDP_PROTOCOL = 17;
+
+  public static final int HOP_BY_HOP_EXTENSION_V6 = 0;
+  public static final int DESTINATION_OPTIONS_V6 = 60;
+  public static final int ROUTING_V6 = 43;
+  public static final int FRAGMENT_V6 = 44;
+  public static final int AUTHENTICATION_V6 = 51;
+  public static final int ENCAPSULATING_SECURITY_V6 = 50;
+  public static final int MOBILITY_EXTENSION_V6 = 135;
+  public static final int NO_NEXT_HEADER = 59;
+  public static final int UDP_HEADER_LENGTH = 8;
+  public static final int VER_IHL_OFFSET = 14;
+
+  public static final int ETHER_HEADER_LENGTH = 14;
+  public static final int ETHER_TYPE_OFFSET = 12;
+
+  public static final int IPv4_TYPE = 0x800;
+  public static final int IPv6_TYPE = 0x86dd;
+  public static final int PPPoV6_TYPE = 0x8864;
+
+
+  public static final int IP_OFFSET = 14;
+
+  public static final int IP4_SRC_OFFSET = IP_OFFSET + 12;
+  public static final int IP4_DST_OFFSET = IP_OFFSET + 16;
+
+  public static final int IP6_SRC_OFFSET = IP_OFFSET + 8;
+  public static final int IP6_DST_OFFSET = IP_OFFSET + 24;
+  public static final int ETHER_DST_OFFSET = 0;
+  public static final int ETHER_SRC_OFFSET = 6;
+
+  public static final int PPPoV6_IP_OFFSET = 28;
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
new file mode 100644
index 0000000..704c3fd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap.decoder;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getInt;
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getIntFileOrder;
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getShortFileOrder;
+
+public class PacketDecoder {
+  //  typedef struct pcap_hdr_s {
+  //      guint32 magic_number;   /* magic number */
+  //      guint16 version_major;  /* major version number */
+  //      guint16 version_minor;  /* minor version number */
+  //      gint32  thiszone;       /* GMT to local correction */
+  //      guint32 sigfigs;        /* accuracy of timestamps */
+  //      guint32 snaplen;        /* max length of captured packets, in octets */
+  //      guint32 network;        /* data link type */
+  //  } pcap_hdr_t;
+  //  magic_number: used to detect the file format itself and the byte ordering. The writing application writes
+  //    0xa1b2c3d4 with it's native byte ordering format into this field. The reading application will read
+  //    either 0xa1b2c3d4 (identical) or 0xd4c3b2a1 (swapped). If the reading application reads the swapped
+  //    0xd4c3b2a1 value, it knows that all the following fields will have to be swapped too. For
+  //    nanosecond-resolution files, the writing application writes 0xa1b23c4d, with the two nibbles
+  //    of the two lower-order bytes swapped, and the reading application will read either 0xa1b23c4d
+  //    (identical) or 0x4d3cb2a1 (swapped).
+  //  version_major, version_minor: the version number of this file format (current version is 2.4)
+  //  thiszone: the correction time in seconds between GMT (UTC) and the local timezone of the following
+  //     packet header timestamps. Examples: If the timestamps are in GMT (UTC), thiszone is simply 0.
+  //     If the timestamps are in Central European time (Amsterdam, Berlin, ...) which is GMT + 1:00,
+  //     thiszone must be -3600. In practice, time stamps are always in GMT, so thiszone is always 0.
+  //  sigfigs: in theory, the accuracy of time stamps in the capture; in practice, all tools set it to 0
+  //  snaplen: the "snapshot length" for the capture (typically 65535 or even more, but might be limited
+  //     by the user), see: incl_len vs. orig_len below
+  //  network: link-layer header type, specifying the type of headers at the beginning of the packet (e.g.
+  //     1 for Ethernet, see tcpdump.org's link-layer header types page for details); this can be various
+  //     types such as 802.11, 802.11 with various radio information, PPP, Token Ring, FDDI, etc.
+  private static final int GLOBAL_HEADER_SIZE = 24;
+  private static final int PCAP_MAGIC_LITTLE_ENDIAN = 0xD4C3B2A1;
+  private static final int PCAP_MAGIC_NUMBER = 0xA1B2C3D4;
+
+
+  private final int maxLength;
+  private final int network;
+  private boolean bigEndian;
+
+  private InputStream input;
+
+  public PacketDecoder(final InputStream input) throws IOException {
+    this.input = input;
+    byte[] globalHeader = new byte[GLOBAL_HEADER_SIZE];
+    int n = input.read(globalHeader);
+    if (n != globalHeader.length) {
+      throw new IOException("Can't read PCAP file header");
+    }
+    switch (getInt(globalHeader, 0)) {
+      case PCAP_MAGIC_NUMBER:
+        bigEndian = true;
+        break;
+      case PCAP_MAGIC_LITTLE_ENDIAN:
+        bigEndian = false;
+        break;
+      default:
+        //noinspection ConstantConditions
+        Preconditions.checkState(false,
+            String.format("Bad magic number = %08x", getIntFileOrder(bigEndian, globalHeader, 0)));
+    }
+    Preconditions.checkState(getShortFileOrder(bigEndian, globalHeader, 4) == 2, "Wanted major version == 2");
+    maxLength = getIntFileOrder(bigEndian, globalHeader, 16);
+    network = getIntFileOrder(bigEndian, globalHeader, 20);
+  }
+
+  public int decodePacket(final byte[] buffer, final int offset, Packet p) {
+    return p.decodePcap(buffer, offset, bigEndian, maxLength);
+  }
+
+  public Packet packet() {
+    return new Packet();
+  }
+
+  public int getNetwork() {
+    return network;
+  }
+
+  public boolean isBigEndian() {
+    return bigEndian;
+  }
+
+  public Packet nextPacket() throws IOException {
+    Packet r = new Packet();
+    if (r.readPcap(input, bigEndian, maxLength)) {
+      return r;
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java
new file mode 100644
index 0000000..5ff3989
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap.dto;
+
+import org.apache.drill.exec.store.pcap.schema.PcapTypes;
+
+import java.util.Objects;
+
+public class ColumnDto {
+
+  private final String columnName;
+  private final PcapTypes columnType;
+
+  public ColumnDto(String columnName, PcapTypes columnType) {
+    this.columnName = columnName;
+    this.columnType = columnType;
+  }
+
+  public String getColumnName() {
+    return columnName;
+  }
+
+  public PcapTypes getColumnType() {
+    return columnType;
+  }
+
+  public boolean isNullable() {
+    return true;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ColumnDto columnDto = (ColumnDto) o;
+    return Objects.equals(columnName, columnDto.columnName) &&
+        columnType == columnDto.columnType;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(columnName, columnType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/package-info.java
new file mode 100644
index 0000000..bb27283
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * For comments on realization of this format plugin look at :
+ * @see <a href="https://issues.apache.org/jira/browse/DRILL-5432"> Jira</a>
+ * @see <a href="https://github.com/apache/drill/pull/831"> pull request</a>
+ */
+package org.apache.drill.exec.store.pcap;

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
new file mode 100644
index 0000000..5c6df71
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap.schema;
+
+public enum PcapTypes {
+  INTEGER,
+  STRING,
+  LONG,
+  TIMESTAMP
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
new file mode 100644
index 0000000..b3e7722
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap.schema;
+
+import org.apache.drill.exec.store.pcap.dto.ColumnDto;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Schema {
+
+  private final List<ColumnDto> columns = new ArrayList<>();
+
+  public Schema() {
+    setupStructure();
+  }
+
+  private void setupStructure() {
+    columns.add(new ColumnDto("type", PcapTypes.STRING));
+    columns.add(new ColumnDto("network", PcapTypes.INTEGER));
+    columns.add(new ColumnDto("timestamp", PcapTypes.TIMESTAMP));
+    columns.add(new ColumnDto("src_ip", PcapTypes.STRING));
+    columns.add(new ColumnDto("dst_ip", PcapTypes.STRING));
+    columns.add(new ColumnDto("src_port", PcapTypes.INTEGER));
+    columns.add(new ColumnDto("dst_port", PcapTypes.INTEGER));
+    columns.add(new ColumnDto("src_mac_address", PcapTypes.STRING));
+    columns.add(new ColumnDto("dst_mac_address", PcapTypes.STRING));
+    columns.add(new ColumnDto("tcp_session", PcapTypes.LONG));
+    columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER));
+    columns.add(new ColumnDto("data", PcapTypes.STRING));
+  }
+
+  /**
+   * Return list with all columns names and its types
+   *
+   * @return List<ColumnDto>
+   */
+  public List<ColumnDto> getColumns() {
+    return Collections.unmodifiableList(columns);
+  }
+
+  public ColumnDto getColumnByIndex(int i) {
+    return columns.get(i);
+  }
+
+  public int getNumberOfColumns() {
+    return columns.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index b5485d2..18a6967 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -41,6 +41,9 @@
           type: "json",
           extensions: [ "json" ]
         },
+        "pcap" : {
+          type: "pcap"
+        },
         "avro" : {
           type: "avro"
         },

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index 2cbc09a..11d29a8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -58,6 +58,7 @@ public class TestFormatPluginOptionExtractor {
           break;
         case "json":
         case "sequencefile":
+        case "pcap":
         case "avro":
           assertEquals(d.typeName, "(type: String)", d.presentParams());
           break;

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java
new file mode 100644
index 0000000..aebd541
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap;
+
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Concatenates PCAP files. The only trickiness is that we have to skip the
+ * 24 byte header on all but the first file.
+ */
+public class ConcatPcap {
+  public static void main(String[] args) throws IOException {
+    try (DataOutputStream out = new DataOutputStream(System.out)) {
+      if (args.length > 0) {
+        boolean first = true;
+        for (String arg : args) {
+          try (FileInputStream in = new FileInputStream(arg)) {
+            copy(first, in, out);
+            first = false;
+          }
+        }
+      } else {
+        copy(true, System.in, out);
+      }
+    }
+  }
+
+  /**
+   * Concatenates a stream onto the output.
+   *
+   * @param first Is this the beginning of the output?
+   * @param in    The data to copy to the output
+   * @param out   Where the output should go
+   * @throws IOException If there is an error reading or writing.
+   */
+  public static void copy(boolean first, InputStream in, DataOutputStream out) throws IOException {
+    byte[] buffer = new byte[1024 * 1024];
+    int n;
+    if (!first) {
+      //noinspection UnusedAssignment
+      n = (int) in.skip(6 * 4L);
+    }
+    n = in.read(buffer);
+    while (n > 0) {
+      out.write(buffer, 0, n);
+      n = in.read(buffer);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
new file mode 100644
index 0000000..56c2dbe
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap;
+
+import com.google.common.io.Resources;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.store.pcap.decoder.Packet;
+import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestPcapDecoder extends BaseTestQuery {
+  private static File bigFile;
+
+  @Test
+  public void testByteOrdering() throws IOException {
+    File f = File.createTempFile("foo", "pcap");
+    f.deleteOnExit();
+    try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f))) {
+      writeHeader(out);
+    }
+
+    try (InputStream in = new FileInputStream(f)) {
+      PacketDecoder pd = new PacketDecoder(in);
+      assertTrue(pd.isBigEndian());
+    }
+  }
+
+  @Test
+  public void testBasics() throws IOException {
+    InputStream in = Resources.getResource("store/pcap/tcp-2.pcap").openStream();
+    PacketDecoder pd = new PacketDecoder(in);
+    Packet p = pd.packet();
+    int offset = 0;
+
+
+    byte[] buffer = new byte[100000];
+    int validBytes = in.read(buffer);
+    assertTrue(validBytes > 50);
+
+    offset = pd.decodePacket(buffer, offset, p);
+    offset = pd.decodePacket(buffer, offset, p);
+    assertEquals(228, offset);
+
+    assertEquals("FE:00:00:00:00:02", p.getEthernetDestination());
+    assertEquals("FE:00:00:00:00:01", p.getEthernetSource());
+    assertEquals("/192.168.0.1", p.getSrc_ip().toString());
+    assertEquals("/192.168.0.2", p.getDst_ip().toString());
+    assertEquals(161, p.getSrc_port());
+    assertEquals(0, p.getDst_port());
+  }
+
+  private static void writeHeader(DataOutputStream out) throws IOException {
+    //        typedef struct pcap_hdr_s {
+    //            guint32 magic_number;   /* magic number */
+    //            guint16 version_major;  /* major version number */
+    //            guint16 version_minor;  /* minor version number */
+    //            gint32  thiszone;       /* GMT to local correction */
+    //            guint32 sigfigs;        /* accuracy of timestamps */
+    //            guint32 snaplen;        /* max length of captured packets, in octets */
+    //            guint32 network;        /* data link type */
+    //        } pcap_hdr_t;
+    //        magic_number: used to detect the file format itself and the byte ordering. The writing application writes 0xa1b2c3d4 with it's native byte ordering format into this field. The reading application will read either 0xa1b2c3d4 (identical) or 0xd4c3b2a1 (swapped). If the reading application reads the swapped 0xd4c3b2a1 value, it knows that all the following fields will have to be swapped too. For nanosecond-resolution files, the writing application writes 0xa1b23c4d, with the two nibbles of the two lower-order bytes swapped, and the reading application will read either 0xa1b23c4d (identical) or 0x4d3cb2a1 (swapped).
+    //        version_major, version_minor: the version number of this file format (current version is 2.4)
+    //        thiszone: the correction time in seconds between GMT (UTC) and the local timezone of the following packet header timestamps. Examples: If the timestamps are in GMT (UTC), thiszone is simply 0. If the timestamps are in Central European time (Amsterdam, Berlin, ...) which is GMT + 1:00, thiszone must be -3600. In practice, time stamps are always in GMT, so thiszone is always 0.
+    //        sigfigs: in theory, the accuracy of time stamps in the capture; in practice, all tools set it to 0
+    //        snaplen: the "snapshot length" for the capture (typically 65535 or even more, but might be limited by the user), see: incl_len vs. orig_len below
+    //        network: link-layer header type, specifying the type of headers at the beginning of the packet (e.g. 1 for Ethernet, see tcpdump.org's link-layer header types page for details); this can be various types such as 802.11, 802.11 with various radio information, PPP, Token Ring, FDDI, etc.
+
+    out.writeInt(0xa1b2c3d4);        // PCAP magic number
+    out.writeShort(2);               // version 2.4
+    out.writeShort(4);
+    out.writeInt(0);                 // assume GMT times
+    out.writeInt(0);                 // everybody does this
+    out.writeInt(65536);             // customary length limit
+    out.writeInt(1);                 // ETHERNET
+  }
+
+  // ----------------------------------------
+  // the code from here down is useful in that it tests the assumptions that
+  // the entire package is based on, but it doesn't really define tests.
+  // As such, it can be run as a main class, but isn't supported as unit tests.
+  /**
+   * This tests the speed when creating an actual object for each packet.
+   * <p>
+   * Even with decent buffering, this isn't very fast.
+   *
+   * @throws IOException If file can't be read.
+   */
+  private static void checkConventionalApproach() throws IOException {
+    speedRun(new FileInputStream(bigFile), " without buffering");
+  }
+
+  /**
+   * This checks the speed when creating an actual object for each packet.
+   * <p>
+   * Even with decent buffering, this isn't very fast.
+   *
+   * @throws IOException If file can't be read.
+   */
+  private static void checkBufferedApproach() throws IOException {
+    speedRun(new BufferedInputStream(new FileInputStream(bigFile), 100000), " with buffering");
+  }
+
+  private static void speedRun(InputStream in, String msg) throws IOException {
+    PacketDecoder pd = new PacketDecoder(in);
+    Packet p = pd.nextPacket();
+    long total = 0;
+    int tcpCount = 0;
+    int udpCount = 0;
+    int allCount = 0;
+    long t0 = System.nanoTime();
+    while (p != null) {
+      total += p.getPacketLength();
+      allCount++;
+      if (p.isTcpPacket()) {
+        tcpCount++;
+      } else if (p.isUdpPacket()) {
+        udpCount++;
+      }
+      // compare to pd.decodePacket() as used in testFastApproach
+      p = pd.nextPacket();
+    }
+    long t1 = System.nanoTime();
+    System.out.printf("\nSpeed test for per packet object%s\n", msg);
+    System.out.printf("    Read %.1f MB in %.2f s for %.1f MB/s\n", total / 1e6, (t1 - t0) / 1e9, (double) total * 1e3 / (t1 - t0));
+    System.out.printf("    %d packets, %d TCP packets, %d UDP\n", allCount, tcpCount, udpCount);
+    System.out.printf("\n\n\n");
+  }
+
+  /**
+   * Tests speed for in-place decoding. This is enormously faster than creating objects, largely
+   * because we rarely have to move any data. Instead, we can examine as it lies in the buffer.
+   *
+   * @throws IOException If file can't be read.
+   */
+  private static void checkFastApproach() throws IOException {
+    InputStream in = new FileInputStream(bigFile);
+    PacketDecoder pd = new PacketDecoder(in);
+    Packet p = pd.packet();
+
+    byte[] buffer = new byte[100000];
+    int validBytes = in.read(buffer);
+
+    int offset = 0;
+    long total = 0;
+    int tcpCount = 0;
+    int udpCount = 0;
+    int allCount = 0;
+    long t0 = System.nanoTime();
+    while (offset < validBytes) {
+      // get new data and shift current data to beginning of buffer if there is any danger
+      // of straddling the buffer end in the next packet
+      // even with jumbo packets this should be enough space to guarantee parsing
+      if (validBytes - offset < 9000) {
+        System.arraycopy(buffer, 0, buffer, offset, validBytes - offset);
+        validBytes = validBytes - offset;
+        offset = 0;
+
+        int n = in.read(buffer, validBytes, buffer.length - validBytes);
+        if (n > 0) {
+          validBytes += n;
+        }
+      }
+
+      // decode the packet as it lies
+      offset = pd.decodePacket(buffer, offset, p);
+      total += p.getPacketLength();
+      allCount++;
+      if (p.isTcpPacket()) {
+        tcpCount++;
+      } else if (p.isUdpPacket()) {
+        udpCount++;
+      }
+    }
+    long t1 = System.nanoTime();
+    System.out.printf("\nSpeed test for in-place packet decoding\n");
+    System.out.printf("    Read %.1f MB in %.2f s for %.1f MB/s\n", total / 1e6, (t1 - t0) / 1e9, (double) total * 1e3 / (t1 - t0));
+    System.out.printf("    %d packets, %d TCP packets, %d UDP\n", allCount, tcpCount, udpCount);
+    System.out.printf("\n\n\n");
+  }
+
+
+  /**
+   * Creates an ephemeral file of about a GB in size
+   *
+   * @throws IOException If input file can't be read or output can't be written.
+   */
+  private static void buildBigTcpFile() throws IOException {
+    bigFile = File.createTempFile("tcp", ".pcap");
+    bigFile.deleteOnExit();
+    boolean first = true;
+    System.out.printf("Building large test file\n");
+    try (DataOutputStream out = new DataOutputStream(new FileOutputStream(bigFile))) {
+      for (int i = 0; i < 1000e6 / (29208 - 24) + 1; i++) {
+        // might be faster to keep this open and rewind each time, but
+        // that is hard to do with a resource, especially if it comes
+        // from the class path instead of files.
+        try (InputStream in = Resources.getResource("store/pcap/tcp-2.pcap").openStream()) {
+          ConcatPcap.copy(first, in, out);
+        }
+        first = false;
+      }
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    System.out.printf("Checking speeds for various approaches\n\n");
+    buildBigTcpFile();
+    checkConventionalApproach();
+    checkBufferedApproach();
+    checkFastApproach();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
new file mode 100644
index 0000000..ce2a4f8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestPcapRecordReader extends BaseTestQuery {
+
+  @Test
+  public void testStarQuery() throws Exception {
+    runSQLVerifyCount("select * from dfs.`${WORKING_PATH}/src/test/resources/store/pcap/tcp-1.pcap`", 16);
+    runSQLVerifyCount("select distinct DST_IP from dfs.`${WORKING_PATH}/src/test/resources/store/pcap/tcp-1.pcap`", 1);
+    runSQLVerifyCount("select distinct DsT_IP from dfs.`${WORKING_PATH}/src/test/resources/store/pcap/tcp-1.pcap`", 1);
+    runSQLVerifyCount("select distinct dst_ip from dfs.`${WORKING_PATH}/src/test/resources/store/pcap/tcp-1.pcap`", 1);
+  }
+
+  @Test
+  public void testCountQuery() throws Exception {
+    runSQLVerifyCount("select count(*) from dfs.`${WORKING_PATH}/src/test/resources/store/pcap/tcp-1.pcap`", 1);
+    runSQLVerifyCount("select count(*) from dfs.`${WORKING_PATH}/src/test/resources/store/pcap/tcp-2.pcap`", 1);
+  }
+
+  @Test
+  public void testDistinctQuery() throws Exception {
+    runSQLVerifyCount("select distinct * from dfs.`${WORKING_PATH}/src/test/resources/store/pcap/tcp-1.pcap`", 1);
+  }
+
+  private void runSQLVerifyCount(String sql, int expectedRowCount) throws Exception {
+    List<QueryDataBatch> results = runSQLWithResults(sql);
+    printResultAndVerifyRowCount(results, expectedRowCount);
+  }
+
+  private List<QueryDataBatch> runSQLWithResults(String sql) throws Exception {
+    return testSqlWithResults(sql);
+  }
+
+  private void printResultAndVerifyRowCount(List<QueryDataBatch> results,
+                                            int expectedRowCount) throws SchemaChangeException {
+    setColumnWidth(25);
+    int rowCount = printResult(results);
+    if (expectedRowCount != -1) {
+      Assert.assertEquals(expectedRowCount, rowCount);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/test/resources/store/pcap/data-1.pcap
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/pcap/data-1.pcap b/exec/java-exec/src/test/resources/store/pcap/data-1.pcap
new file mode 100644
index 0000000..58adc5d
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcap/data-1.pcap differ

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/test/resources/store/pcap/data-2.pcap
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/pcap/data-2.pcap b/exec/java-exec/src/test/resources/store/pcap/data-2.pcap
new file mode 100644
index 0000000..1f23ee8
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcap/data-2.pcap differ

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/test/resources/store/pcap/tcp-1.pcap
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/pcap/tcp-1.pcap b/exec/java-exec/src/test/resources/store/pcap/tcp-1.pcap
new file mode 100644
index 0000000..bb3364e
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcap/tcp-1.pcap differ

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/exec/java-exec/src/test/resources/store/pcap/tcp-2.pcap
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/pcap/tcp-2.pcap b/exec/java-exec/src/test/resources/store/pcap/tcp-2.pcap
new file mode 100644
index 0000000..2545d18
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcap/tcp-2.pcap differ

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index e4261df..bf38dce 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -513,6 +513,10 @@ public final class UserBitShared {
      * <code>AVRO_SUB_SCAN = 36;</code>
      */
     AVRO_SUB_SCAN(36, 36),
+    /**
+     * <code>PCAP_SUB_SCAN = 37;</code>
+     */
+    PCAP_SUB_SCAN(37, 37),
     ;
 
     /**
@@ -663,6 +667,10 @@ public final class UserBitShared {
      * <code>AVRO_SUB_SCAN = 36;</code>
      */
     public static final int AVRO_SUB_SCAN_VALUE = 36;
+    /**
+     * <code>PCAP_SUB_SCAN = 37;</code>
+     */
+    public static final int PCAP_SUB_SCAN_VALUE = 37;
 
 
     public final int getNumber() { return value; }
@@ -706,6 +714,7 @@ public final class UserBitShared {
         case 34: return WINDOW;
         case 35: return NESTED_LOOP_JOIN;
         case 36: return AVRO_SUB_SCAN;
+        case 37: return PCAP_SUB_SCAN;
         default: return null;
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/f5b975ad/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index b21d3ae..a795f55 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -58,7 +58,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
     HBASE_SUB_SCAN(33),
     WINDOW(34),
     NESTED_LOOP_JOIN(35),
-    AVRO_SUB_SCAN(36);
+    AVRO_SUB_SCAN(36),
+    PCAP_SUB_SCAN(37);
     
     public final int number;
     
@@ -113,6 +114,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
             case 34: return WINDOW;
             case 35: return NESTED_LOOP_JOIN;
             case 36: return AVRO_SUB_SCAN;
+            case 37: return PCAP_SUB_SCAN;
             default: return null;
         }
     }