You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/11/26 13:33:07 UTC

[drill] 01/02: DRILL-7443: Enable PCAP Plugin to Reassemble TCP Streams

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit bd549b9cbfa538038ec440aa8c623b3bdb40d50b
Author: Charles Givre <cg...@apache.org>
AuthorDate: Fri Nov 22 16:04:29 2019 -0500

    DRILL-7443: Enable PCAP Plugin to Reassemble TCP Streams
    
    closes #1898
---
 .../drill/exec/store/pcap/PcapBatchReader.java     | 203 ++++++++--
 .../drill/exec/store/pcap/PcapFormatConfig.java    |   8 +-
 .../drill/exec/store/pcap/PcapRecordReader.java    | 418 ---------------------
 .../drill/exec/store/pcap/decoder/Packet.java      |  49 ++-
 .../exec/store/pcap/decoder/TcpHandshake.java      | 144 +++++++
 .../drill/exec/store/pcap/decoder/TcpSession.java  | 321 ++++++++++++++++
 .../drill/exec/store/pcap/schema/PcapTypes.java    |   5 +-
 .../drill/exec/store/pcap/schema/Schema.java       |  72 ++--
 .../store/dfs/TestFormatPluginOptionExtractor.java |   4 +-
 .../drill/exec/store/pcap/TestPcapDecoder.java     |   8 +-
 .../drill/exec/store/pcap/TestPcapEVFReader.java   |   1 -
 .../drill/exec/store/pcap/TestSessionizePCAP.java  | 107 ++++++
 .../test/resources/store/pcap/attack-trace.pcap    | Bin 0 -> 189103 bytes
 13 files changed, 850 insertions(+), 490 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
index c9caa24..5e4a46c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.pcap.decoder.Packet;
 import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
+import org.apache.drill.exec.store.pcap.decoder.TcpSession;
 import org.apache.drill.exec.store.pcap.schema.Schema;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.hadoop.mapred.FileSplit;
@@ -35,12 +36,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
 
 public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
-  public static final int BUFFER_SIZE = 500_000;
+  protected static final int BUFFER_SIZE = 500_000;
 
   private static final Logger logger = LoggerFactory.getLogger(PcapBatchReader.class);
 
@@ -116,17 +119,56 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   private ScalarWriter isCorruptWriter;
 
+  private PcapReaderConfig readerConfig;
+
+
+  // Writers for TCP Sessions
+  private ScalarWriter sessionStartTimeWriter;
+
+  private ScalarWriter sessionEndTimeWriter;
+
+  private ScalarWriter sessionDurationWriter;
+
+  private ScalarWriter connectionTimeWriter;
+
+  private ScalarWriter packetCountWriter;
+
+  private ScalarWriter originPacketCounterWriter;
+
+  private ScalarWriter remotePacketCounterWriter;
+
+  private ScalarWriter originDataVolumeWriter;
+
+  private ScalarWriter remoteDataVolumeWriter;
+
+  private ScalarWriter hostDataWriter;
+
+  private ScalarWriter remoteDataWriter;
+
+
+  private Map<Long, TcpSession> sessionQueue;
+
 
   public static class PcapReaderConfig {
 
     protected final PcapFormatPlugin plugin;
 
+    public boolean sessionizeTCPStreams;
+
+    private PcapFormatConfig config;
+
     public PcapReaderConfig(PcapFormatPlugin plugin) {
       this.plugin = plugin;
+      this.config = plugin.getConfig();
+      this.sessionizeTCPStreams = config.sessionizeTCPStreams;
     }
   }
 
   public PcapBatchReader(PcapReaderConfig readerConfig) {
+    this.readerConfig = readerConfig;
+    if (readerConfig.sessionizeTCPStreams) {
+      sessionQueue = new HashMap<>();
+    }
   }
 
   @Override
@@ -134,46 +176,14 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
     split = negotiator.split();
     openFile(negotiator);
     SchemaBuilder builder = new SchemaBuilder();
-    Schema pcapSchema = new Schema();
+    Schema pcapSchema = new Schema(readerConfig.sessionizeTCPStreams);
     TupleMetadata schema = pcapSchema.buildSchema(builder);
     negotiator.setTableSchema(schema, false);
     ResultSetLoader loader = negotiator.build();
 
     // Creates writers for all fields (Since schema is known)
     rowWriter = loader.writer();
-    typeWriter = rowWriter.scalar("type");
-    timestampWriter = rowWriter.scalar("packet_timestamp");
-    timestampMicroWriter = rowWriter.scalar("timestamp_micro");
-    networkWriter = rowWriter.scalar("network");
-    srcMacAddressWriter = rowWriter.scalar("src_mac_address");
-    dstMacAddressWriter = rowWriter.scalar("dst_mac_address");
-    dstIPWriter = rowWriter.scalar("dst_ip");
-    srcIPWriter = rowWriter.scalar("src_ip");
-    srcPortWriter = rowWriter.scalar("src_port");
-    dstPortWriter = rowWriter.scalar("dst_port");
-    packetLengthWriter = rowWriter.scalar("packet_length");
-
-    //Writers for TCP Packets
-    tcpSessionWriter = rowWriter.scalar("tcp_session");
-    tcpSequenceWriter = rowWriter.scalar("tcp_sequence");
-    tcpAckNumberWriter = rowWriter.scalar("tcp_ack");
-    tcpFlagsWriter = rowWriter.scalar("tcp_flags");
-    tcpParsedFlagsWriter = rowWriter.scalar("tcp_parsed_flags");
-    tcpNsWriter = rowWriter.scalar("tcp_flags_ns");
-    tcpCwrWriter = rowWriter.scalar("tcp_flags_cwr");
-    tcpEceWriter = rowWriter.scalar("tcp_flags_ece");
-    tcpFlagsEceEcnCapableWriter = rowWriter.scalar("tcp_flags_ece_ecn_capable");
-    tcpFlagsCongestionWriter = rowWriter.scalar("tcp_flags_ece_congestion_experienced");
-
-    tcpUrgWriter = rowWriter.scalar("tcp_flags_urg");
-    tcpAckWriter = rowWriter.scalar("tcp_flags_ack");
-    tcpPshWriter = rowWriter.scalar("tcp_flags_psh");
-    tcpRstWriter = rowWriter.scalar("tcp_flags_rst");
-    tcpSynWriter = rowWriter.scalar("tcp_flags_syn");
-    tcpFinWriter = rowWriter.scalar("tcp_flags_fin");
-
-    dataWriter = rowWriter.scalar("data");
-    isCorruptWriter = rowWriter.scalar("is_corrupt");
+    populateColumnWriters(rowWriter);
 
     return true;
   }
@@ -190,6 +200,14 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   @Override
   public void close() {
+
+    /* This warning could occur in the event of a corrupt or incomplete PCAP file. Specifically,
+     * if a session is started in one file and the end of the session is not captured in the same file.
+     */
+    if (sessionQueue != null && !sessionQueue.isEmpty()) {
+      logger.warn("Unclosed sessions remaining in PCAP");
+    }
+
     try {
       fsStream.close();
     } catch (IOException e) {
@@ -217,7 +235,69 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
+  private void populateColumnWriters(RowSetLoader rowWriter) {
+    if (readerConfig.sessionizeTCPStreams) {
+      srcMacAddressWriter = rowWriter.scalar("src_mac_address");
+      dstMacAddressWriter = rowWriter.scalar("dst_mac_address");
+      dstIPWriter = rowWriter.scalar("dst_ip");
+      srcIPWriter = rowWriter.scalar("src_ip");
+      srcPortWriter = rowWriter.scalar("src_port");
+      dstPortWriter = rowWriter.scalar("dst_port");
+      sessionStartTimeWriter = rowWriter.scalar("session_start_time");
+      sessionEndTimeWriter = rowWriter.scalar("session_end_time");
+      sessionDurationWriter = rowWriter.scalar("session_duration");
+      connectionTimeWriter = rowWriter.scalar("connection_time");
+      tcpSessionWriter = rowWriter.scalar("tcp_session");
+      packetCountWriter = rowWriter.scalar("total_packet_count");
+      hostDataWriter = rowWriter.scalar("data_from_originator");
+      remoteDataWriter = rowWriter.scalar("data_from_remote");
+
+      originPacketCounterWriter = rowWriter.scalar("packet_count_from_origin");
+      remotePacketCounterWriter = rowWriter.scalar("packet_count_from_remote");
+      originDataVolumeWriter = rowWriter.scalar("data_volume_from_origin");
+      remoteDataVolumeWriter = rowWriter.scalar("data_volume_from_remote");
+      isCorruptWriter = rowWriter.scalar("is_corrupt");
+
+    } else {
+      typeWriter = rowWriter.scalar("type");
+      timestampWriter = rowWriter.scalar("packet_timestamp");
+      timestampMicroWriter = rowWriter.scalar("timestamp_micro");
+      networkWriter = rowWriter.scalar("network");
+      srcMacAddressWriter = rowWriter.scalar("src_mac_address");
+      dstMacAddressWriter = rowWriter.scalar("dst_mac_address");
+      dstIPWriter = rowWriter.scalar("dst_ip");
+      srcIPWriter = rowWriter.scalar("src_ip");
+      srcPortWriter = rowWriter.scalar("src_port");
+      dstPortWriter = rowWriter.scalar("dst_port");
+      packetLengthWriter = rowWriter.scalar("packet_length");
+
+      //Writers for TCP Packets
+      tcpSessionWriter = rowWriter.scalar("tcp_session");
+      tcpSequenceWriter = rowWriter.scalar("tcp_sequence");
+      tcpAckNumberWriter = rowWriter.scalar("tcp_ack");
+      tcpFlagsWriter = rowWriter.scalar("tcp_flags");
+      tcpParsedFlagsWriter = rowWriter.scalar("tcp_parsed_flags");
+      tcpNsWriter = rowWriter.scalar("tcp_flags_ns");
+      tcpCwrWriter = rowWriter.scalar("tcp_flags_cwr");
+      tcpEceWriter = rowWriter.scalar("tcp_flags_ece");
+      tcpFlagsEceEcnCapableWriter = rowWriter.scalar("tcp_flags_ece_ecn_capable");
+      tcpFlagsCongestionWriter = rowWriter.scalar("tcp_flags_ece_congestion_experienced");
+
+      tcpUrgWriter = rowWriter.scalar("tcp_flags_urg");
+      tcpAckWriter = rowWriter.scalar("tcp_flags_ack");
+      tcpPshWriter = rowWriter.scalar("tcp_flags_psh");
+      tcpRstWriter = rowWriter.scalar("tcp_flags_rst");
+      tcpSynWriter = rowWriter.scalar("tcp_flags_syn");
+      tcpFinWriter = rowWriter.scalar("tcp_flags_fin");
+
+      dataWriter = rowWriter.scalar("data");
+      isCorruptWriter = rowWriter.scalar("is_corrupt");
+    }
+  }
+
   private boolean parseNextPacket(RowSetLoader rowWriter) {
+
+    // Decode the packet
     Packet packet = new Packet();
 
     if (offset >= validBytes) {
@@ -234,8 +314,28 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
       packet.setIsCorrupt(true);
       logger.debug("Invalid packet at offset {}", old);
     }
-    addDataToTable(packet, decoder.getNetwork(), rowWriter);
 
+    // If we are resessionizing the TCP Stream, add the packet to the stream
+    if (readerConfig.sessionizeTCPStreams) {
+      // If the session has not been seen before, add it to the queue
+      long sessionID = packet.getSessionHash();
+      if (!sessionQueue.containsKey(sessionID)) {
+        logger.debug("Adding session {} to session queue.", sessionID);
+        sessionQueue.put(sessionID, new TcpSession(sessionID));
+      }
+
+      // When the session is closed, write it and remove it from the session queue.
+      sessionQueue.get(sessionID).addPacket(packet);
+      if (sessionQueue.get(sessionID).connectionClosed()) {
+        // Write out the session
+        addSessionDataToTable(sessionQueue.get(sessionID), rowWriter);
+        // Remove from the queue
+        sessionQueue.remove(sessionID);
+      }
+
+    } else {
+      addDataToTable(packet, decoder.getNetwork(), rowWriter);
+    }
     return true;
   }
 
@@ -268,7 +368,35 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
     return true;
   }
 
-  private boolean addDataToTable(Packet packet, int networkType, RowSetLoader rowWriter) {
+  private void addSessionDataToTable(TcpSession session, RowSetLoader rowWriter) {
+    rowWriter.start();
+
+    sessionStartTimeWriter.setTimestamp(session.getSessionStartTime());
+    sessionEndTimeWriter.setTimestamp(session.getSessionEndTime());
+    sessionDurationWriter.setPeriod(session.getSessionDuration());
+    connectionTimeWriter.setPeriod(session.getConnectionTime());
+
+    srcMacAddressWriter.setString(session.getSrcMac());
+    dstMacAddressWriter.setString(session.getDstMac());
+    srcIPWriter.setString(session.getSrcIP());
+    dstIPWriter.setString(session.getDstIP());
+    srcPortWriter.setInt(session.getSrcPort());
+    dstPortWriter.setInt(session.getDstPort());
+    tcpSessionWriter.setLong(session.getSessionID());
+    packetCountWriter.setInt(session.getPacketCount());
+
+    originPacketCounterWriter.setInt(session.getPacketCountFromOrigin());
+    remotePacketCounterWriter.setInt(session.getPacketCountFromRemote());
+    originDataVolumeWriter.setInt(session.getDataFromOriginator().length);
+    remoteDataVolumeWriter.setInt(session.getDataFromRemote().length);
+    isCorruptWriter.setBoolean(session.hasCorruptedData());
+
+    hostDataWriter.setString(session.getDataFromOriginatorAsString());
+    remoteDataWriter.setString(session.getDataFromRemoteAsString());
+    rowWriter.save();
+  }
+
+  private void addDataToTable(Packet packet, int networkType, RowSetLoader rowWriter) {
     rowWriter.start();
 
     typeWriter.setString(packet.getPacketType());
@@ -312,6 +440,5 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
     // TODO Parse Data Packet Here:
     // Description of work in
     // DRILL-7400: Add Packet Decoders with Interface to Drill
-    return true;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
index c06ddf9..133b5d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
@@ -34,13 +34,16 @@ public class PcapFormatConfig implements FormatPluginConfig {
   public List<String> extensions;
 
   @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+  public boolean sessionizeTCPStreams = false;
+
+  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
   public List<String> getExtensions() {
     return extensions == null ? DEFAULT_EXTS : extensions;
   }
 
   @Override
   public int hashCode() {
-    return Arrays.hashCode(new Object[]{extensions});
+    return Arrays.hashCode(new Object[]{extensions, sessionizeTCPStreams});
   }
 
   @Override
@@ -52,6 +55,7 @@ public class PcapFormatConfig implements FormatPluginConfig {
       return false;
     }
     PcapFormatConfig other = (PcapFormatConfig) obj;
-    return Objects.equal(extensions, other.extensions);
+    return Objects.equal(extensions, other.extensions)
+      && Objects.equal(sessionizeTCPStreams, other.sessionizeTCPStreams);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
deleted file mode 100644
index f9b8a72..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcap;
-
-import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.pcap.decoder.Packet;
-import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
-import org.apache.drill.exec.store.pcap.dto.ColumnDto;
-import org.apache.drill.exec.store.pcap.schema.PcapTypes;
-import org.apache.drill.exec.store.pcap.schema.Schema;
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableTimeStampVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
-
-public class PcapRecordReader extends AbstractRecordReader {
-  static final int BUFFER_SIZE = 500_000;  // this should be relatively large relative to max packet
-
-  private static final Logger logger = LoggerFactory.getLogger(PcapRecordReader.class);
-  private static final int BATCH_SIZE = 40_000;
-
-  private OutputMutator output;
-
-  private PacketDecoder decoder;
-  private ImmutableList<ProjectedColumnInfo> projectedCols;
-  private FileSystem fs;
-
-  private byte[] buffer;
-  private int offset = 0;
-  private FSDataInputStream in;
-  private int validBytes;
-
-  private final Path pathToFile;
-  private List<SchemaPath> projectedColumns;
-
-  private static final Map<PcapTypes, MinorType> TYPES;
-
-  private static class ProjectedColumnInfo {
-    ValueVector vv;
-    ColumnDto pcapColumn;
-  }
-
-  static {
-    TYPES = ImmutableMap.<PcapTypes, TypeProtos.MinorType>builder()
-            .put(PcapTypes.STRING, MinorType.VARCHAR)
-            .put(PcapTypes.INTEGER, MinorType.INT)
-            .put(PcapTypes.LONG, MinorType.BIGINT)
-            .put(PcapTypes.TIMESTAMP, MinorType.TIMESTAMP)
-            .put(PcapTypes.BOOLEAN, MinorType.BIT)
-            .build();
-  }
-
-  public PcapRecordReader(final Path pathToFile,
-                          final FileSystem fileSystem,
-                          final List<SchemaPath> projectedColumns) {
-    this.fs = fileSystem;
-    this.pathToFile = fs.makeQualified(pathToFile);
-    this.projectedColumns = projectedColumns;
-  }
-
-  @Override
-  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
-    try {
-      this.output = output;
-      this.in = fs.open(pathToFile);
-      this.decoder = new PacketDecoder(in);
-      this.buffer = new byte[BUFFER_SIZE + decoder.getMaxLength()];
-      this.validBytes = in.read(buffer);
-      this.projectedCols = getProjectedColsIfItNull();
-      setColumns(projectedColumns);
-    } catch (IOException io) {
-      throw UserException.dataReadError(io)
-              .addContext("File name:", pathToFile.toUri().getPath())
-              .build(logger);
-    }
-  }
-
-  @Override
-  public int next() {
-    try {
-      return parsePcapFilesAndPutItToTable();
-    } catch (IOException io) {
-      throw UserException.dataReadError(io)
-              .addContext("Trouble with reading packets in file!")
-              .build(logger);
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    in.close();
-  }
-
-  private ImmutableList<ProjectedColumnInfo> getProjectedColsIfItNull() {
-    return projectedCols != null ? projectedCols : initCols(new Schema());
-  }
-
-  private ImmutableList<ProjectedColumnInfo> initCols(final Schema schema) {
-    ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
-    ColumnDto column;
-
-    for (int i = 0; i < schema.getNumberOfColumns(); i++) {
-      column = schema.getColumnByIndex(i);
-
-      final String name = column.getColumnName().toLowerCase();
-      final PcapTypes type = column.getColumnType();
-      TypeProtos.MinorType minorType = TYPES.get(type);
-
-      ProjectedColumnInfo pci = getProjectedColumnInfo(column, name, minorType);
-      pciBuilder.add(pci);
-    }
-    return pciBuilder.build();
-  }
-
-  private ProjectedColumnInfo getProjectedColumnInfo(final ColumnDto column,
-                                                     final String name,
-                                                     final MinorType minorType) {
-    TypeProtos.MajorType majorType = getMajorType(minorType);
-
-    MaterializedField field =
-            MaterializedField.create(name, majorType);
-
-    ValueVector vector =
-            getValueVector(minorType, majorType, field);
-
-    return getProjectedColumnInfo(column, vector);
-  }
-
-  private ProjectedColumnInfo getProjectedColumnInfo(final ColumnDto column, final ValueVector vector) {
-    ProjectedColumnInfo pci = new ProjectedColumnInfo();
-    pci.vv = vector;
-    pci.pcapColumn = column;
-    return pci;
-  }
-
-  private MajorType getMajorType(final MinorType minorType) {
-    return Types.optional(minorType);
-  }
-
-  private ValueVector getValueVector(final MinorType minorType,
-                                     final MajorType majorType,
-                                     final MaterializedField field) {
-    try {
-
-      final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
-              minorType, majorType.getMode());
-      ValueVector vector = output.addField(field, clazz);
-      vector.allocateNew();
-      return vector;
-
-    } catch (SchemaChangeException sce) {
-      throw new IllegalStateException("The addition of this field is incompatible with this OutputMutator's capabilities");
-    }
-  }
-
-  private int parsePcapFilesAndPutItToTable() throws IOException {
-    Packet packet = new Packet();
-    int counter = 0;
-    while (offset < validBytes && counter < BATCH_SIZE) {
-      if (validBytes - offset < decoder.getMaxLength()) {
-        if (validBytes == buffer.length) {
-          // shift data and read more. This is the common case.
-          System.arraycopy(buffer, offset, buffer, 0, validBytes - offset);
-          validBytes = validBytes - offset;
-          offset = 0;
-
-          int n = in.read(buffer, validBytes, buffer.length - validBytes);
-          if (n > 0) {
-            validBytes += n;
-          }
-          logger.info("read {} bytes, at {} offset", n, validBytes);
-        } else {
-          // near the end of the file, we will just top up the buffer without shifting
-          int n = in.read(buffer, offset, buffer.length - offset);
-          if (n > 0) {
-            validBytes = validBytes + n;
-            logger.info("Topped up buffer with {} bytes to yield {}\n", n, validBytes);
-          }
-        }
-      }
-      int old = offset;
-      offset = decoder.decodePacket(buffer, offset, packet, decoder.getMaxLength(), validBytes);
-      if (offset > validBytes) {
-        //Start here...
-        logger.error("Invalid packet at offset {}", old);
-      }
-
-      if (addDataToTable(packet, decoder.getNetwork(), counter)) {
-        counter++;
-      }
-    }
-    return counter;
-  }
-
-  private boolean addDataToTable(final Packet packet, final int networkType, final int count) {
-    for (ProjectedColumnInfo pci : projectedCols) {
-      switch (pci.pcapColumn.getColumnName()) {
-        case "type":
-          setStringColumnValue(packet.getPacketType(), pci, count);
-          break;
-        case "timestamp":
-          setTimestampColumnValue(packet.getTimestamp(), pci, count);
-          break;
-        case "timestamp_micro":
-          setLongColumnValue(packet.getTimestampMicro(), pci, count);
-          break;
-        case "network":
-          setIntegerColumnValue(networkType, pci, count);
-          break;
-        case "src_mac_address":
-          setStringColumnValue(packet.getEthernetSource(), pci, count);
-          break;
-        case "dst_mac_address":
-          setStringColumnValue(packet.getEthernetDestination(), pci, count);
-          break;
-        case "dst_ip":
-          if (packet.getDst_ip() != null) {
-            setStringColumnValue(packet.getDst_ip().getHostAddress(), pci, count);
-          } else {
-            setStringColumnValue(null, pci, count);
-          }
-          break;
-        case "src_ip":
-          if (packet.getSrc_ip() != null) {
-            setStringColumnValue(packet.getSrc_ip().getHostAddress(), pci, count);
-          } else {
-            setStringColumnValue(null, pci, count);
-          }
-          break;
-        case "src_port":
-          setIntegerColumnValue(packet.getSrc_port(), pci, count);
-          break;
-        case "dst_port":
-          setIntegerColumnValue(packet.getDst_port(), pci, count);
-          break;
-        case "tcp_session":
-          if (packet.isTcpPacket()) {
-            setLongColumnValue(packet.getSessionHash(), pci, count);
-          }
-          break;
-        case "tcp_sequence":
-          if (packet.isTcpPacket()) {
-            setIntegerColumnValue(packet.getSequenceNumber(), pci, count);
-          }
-          break;
-        case "tcp_ack":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue(packet.getAckNumber(), pci, count);
-          }
-          break;
-        case "tcp_flags":
-          if (packet.isTcpPacket()) {
-            setIntegerColumnValue(packet.getFlags(), pci, count);
-          }
-          break;
-        case "tcp_parsed_flags":
-          if (packet.isTcpPacket()) {
-            setStringColumnValue(packet.getParsedFlags(), pci, count);
-          }
-          break;
-        case "tcp_flags_ns":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue((packet.getFlags() & 0x100) != 0, pci, count);
-          }
-          break;
-        case "tcp_flags_cwr":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue((packet.getFlags() & 0x80) != 0, pci, count);
-          }
-          break;
-        case "tcp_flags_ece ":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue((packet.getFlags() & 0x40) != 0, pci, count);
-          }
-          break;
-        case "tcp_flags_ece_ecn_capable":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue((packet.getFlags() & 0x42) == 0x42, pci, count);
-          }
-          break;
-        case "tcp_flags_ece_congestion_experienced":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue((packet.getFlags() & 0x42) == 0x40, pci, count);
-          }
-          break;
-        case "tcp_flags_urg":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue((packet.getFlags() & 0x20) != 0, pci, count);
-          }
-          break;
-        case "tcp_flags_ack":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue((packet.getFlags() & 0x10) != 0, pci, count);
-          }
-          break;
-        case "tcp_flags_psh":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue((packet.getFlags() & 0x8) != 0, pci, count);
-          }
-          break;
-        case "tcp_flags_rst":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue((packet.getFlags() & 0x4) != 0, pci, count);
-          }
-          break;
-        case "tcp_flags_syn":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue((packet.getFlags() & 0x2) != 0, pci, count);
-          }
-          break;
-        case "tcp_flags_fin":
-          if (packet.isTcpPacket()) {
-            setBooleanColumnValue((packet.getFlags() & 0x1) != 0, pci, count);
-          }
-          break;
-        case "packet_length":
-          setIntegerColumnValue(packet.getPacketLength(), pci, count);
-          break;
-        case "is_corrupt":
-          setBooleanColumnValue(packet.isCorrupt(), pci, count);
-          break;
-        case "data":
-          if (packet.getData() != null) {
-            setStringColumnValue(parseBytesToASCII(packet.getData()), pci, count);
-          } else {
-            setStringColumnValue("[]", pci, count);
-          }
-          break;
-      }
-    }
-    return true;
-  }
-
-  private void setLongColumnValue(long data, ProjectedColumnInfo pci, final int count) {
-    ((NullableBigIntVector.Mutator) pci.vv.getMutator())
-            .setSafe(count, data);
-  }
-
-  private void setIntegerColumnValue(final int data, final ProjectedColumnInfo pci, final int count) {
-    ((NullableIntVector.Mutator) pci.vv.getMutator())
-            .setSafe(count, data);
-  }
-
-  private void setBooleanColumnValue(final boolean data, final ProjectedColumnInfo pci, final int count) {
-    ((NullableBitVector.Mutator) pci.vv.getMutator())
-            .setSafe(count, data ? 1 : 0);
-  }
-
-  private void setBooleanColumnValue(final int data, final ProjectedColumnInfo pci, final int count) {
-    ((NullableBitVector.Mutator) pci.vv.getMutator())
-            .setSafe(count, data);
-  }
-
-  private void setTimestampColumnValue(final long data, final ProjectedColumnInfo pci, final int count) {
-    ((NullableTimeStampVector.Mutator) pci.vv.getMutator())
-            .setSafe(count, data);
-  }
-
-  private void setStringColumnValue(final String data, final ProjectedColumnInfo pci, final int count) {
-    if (data == null) {
-      ((NullableVarCharVector.Mutator) pci.vv.getMutator())
-              .setNull(count);
-    } else {
-      ByteBuffer value = ByteBuffer.wrap(data.getBytes(UTF_8));
-      ((NullableVarCharVector.Mutator) pci.vv.getMutator())
-              .setSafe(count, value, 0, value.remaining());
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "PcapRecordReader[File=" + pathToFile.toUri() + "]";
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
index 803a779..7d49699 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
@@ -33,7 +33,7 @@ import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getByte;
 import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getIntFileOrder;
 import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getShort;
 
-public class Packet {
+public class Packet implements Comparable<Packet> {
   // pcap header
   //        typedef struct pcaprec_hdr_s {
   //            guint32 ts_sec;         // timestamp seconds
@@ -223,6 +223,42 @@ public class Packet {
     isCorrupt = value;
   }
 
+  public boolean getUrgFlag() {
+    return (getFlags() & 0x20) != 0;
+  }
+
+  public boolean getPshFlag() {
+    return (getFlags() & 0x8) != 0;
+  }
+
+  public boolean getEceFlag() {
+    return (getFlags() & 0x40) != 0;
+  }
+
+  public boolean getSynFlag() {
+    return (getFlags() & 0x2) != 0;
+  }
+
+  public boolean getAckFlag() {
+    return (getFlags() & 0x10) != 0;
+  }
+
+  public boolean getRstFlag() {
+    return (getFlags() & 0x4) != 0;
+  }
+
+  public boolean getFinFlag() {
+    return (getFlags() & 0x1) != 0;
+  }
+
+  public boolean getNSFlag() {
+    return (getFlags() & 0x100) != 0;
+  }
+
+  public boolean getCwrFlag() {
+    return (getFlags() & 0x80) != 0;
+  }
+
   public static String formatFlags(int flags) {
     int mask = 0x100;
     StringBuilder r = new StringBuilder();
@@ -491,4 +527,15 @@ public class Packet {
     int dstPortOffset = ipOffset + getIPHeaderLength() + offset;
     return convertShort(raw, dstPortOffset);
   }
+
+  /**
+   * This function is here so that packets can be sorted for re-sessionization. Packets in TCP streams
+   * are ordered by the sequence number, so being able to order the packets is necessary to reassemble the
+   * TCP session.
+   * @param o The packet to which the current packet is compared to.
+   * @return Returns the difference in sequence number.
+   */
+  public int compareTo(Packet o) {
+    return this.getSequenceNumber() - (o).getSequenceNumber();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java
new file mode 100644
index 0000000..2fc3d05
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pcap.decoder;
+
+/**
+ * This class is used to record the status of the TCP Handshake. Initially this is used just to determine whether a session is open or closed, but
+ * future functionality could include SYN flood identification, or other hackery with TCP flags.
+ */
+public class TcpHandshake {
+  private boolean isConnected = false;
+
+  private State currentSessionState = State.NONE;
+
+  private long sessionID;
+
+  /**
+   * Returns true for a correct TCP handshake: SYN|SYNACK|ACK, False if not.
+   *
+   * @return boolean true if the session is open, false if not.
+   */
+  public boolean isConnected() {
+    return isConnected;
+  }
+
+  /**
+   * This method returns true if the session is closed properly via FIN -> FIN ACK, false if not.
+   *
+   * @return boolean true if the session is closed, false if not.
+   */
+  public boolean isClosed() {
+    if (currentSessionState == State.CLOSE_WAIT ||
+      currentSessionState == State.FORCED_CLOSED ||
+      currentSessionState == State.CLOSED ||
+      currentSessionState == State.TIME_WAIT ||
+      currentSessionState == State.FIN_WAIT) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public State getCurrentSessionState() {
+    return currentSessionState;
+  }
+
+  public void setConnected(long sessionID) {
+    this.sessionID = sessionID;
+    currentSessionState = State.OPEN;
+    isConnected = true;
+  }
+
+  public void setRst() {
+    isConnected = false;
+    currentSessionState = State.FORCED_CLOSED;
+  }
+
+  public void setFin() {
+    if (currentSessionState == State.OPEN) {
+      currentSessionState = State.CLOSE_WAIT;   // The next packet should be another FIN packet
+    } else if (currentSessionState == State.CLOSE_WAIT) {
+      currentSessionState = State.TIME_WAIT;
+    }
+  }
+
+  public void setAck() {
+    if (currentSessionState == State.SYN) {
+      currentSessionState = State.SYNACK;
+    } else if (currentSessionState == State.SYNACK) {
+      currentSessionState = State.OPEN;
+      isConnected = true;
+    } else if (currentSessionState == State.CLOSE_WAIT) {
+      currentSessionState = State.FIN_WAIT;
+    }
+  }
+
+  public void setSyn() {
+    if (currentSessionState == State.NONE) {
+      currentSessionState = State.SYN;
+    }
+  }
+
+  /**
+   * This enum variable represents the various states of the TCP Handshake
+   */
+  enum State {
+    /**
+     * The NONE state is the initialization state. No session has be established
+     */
+    NONE,
+    /**
+     * The OPEN state represents a successfully opened TCP session. It is established in the final step in the TCP
+     * handshake.
+     */
+    OPEN,
+    /**
+     * The CLOSED session represents a closed TCP session. This state occurs after the final ACK of the 4 way close process
+     */
+    CLOSED,
+    /**
+     * The CLOSE_WAIT state represents a session in which one party has sent a frame with a FIN flag set.
+     * At this point resources can be released, however, to fully close the session the other party needs to send a frame
+     * with an ACK packet.
+    */
+    CLOSE_WAIT,
+    /**
+     * This state occurs after receiving the first FIN/ACK frame.  The recipient will then follow with a FIN frame, closing the session.
+     */
+    TIME_WAIT,
+    /**
+     * The SYN state represents the first step in the TCP handshake. The originator has sent a frame with the SYN flag set.
+     * The next step would be the SYN/ACK stage.
+     */
+    SYN,
+    /**
+     * This step represents the second step of the TCP handshake in which the recipient acknowledges the originator's SYN
+     * flag.
+     */
+    SYNACK,
+    /**
+     * The FORCED_CLOSED state represents a session which was closed forcefully by a RST frame.
+     */
+    FORCED_CLOSED,
+    /**
+     * The FIN_WAIT state occurs after receiving the initial FIN flag.
+     */
+    FIN_WAIT
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java
new file mode 100644
index 0000000..5900fd6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pcap.decoder;
+
+import org.joda.time.Instant;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
+
+/**
+ * This class is the representation of a TCP session.
+ */
+public class TcpSession {
+
+  private final List<Packet> packetsFromSender;
+  private final List<Packet> packetsFromReceiver;
+  private final TcpHandshake handshake;
+  private final long sessionID;
+
+  private long startTime;
+  private long endTime;
+  private int packetCount;
+  private InetAddress srcIP;
+  private InetAddress dstIP;
+  private int srcPort;
+  private int dstPort;
+  private String srcMac;
+  private String dstMac;
+  private long synTime;
+  private long ackTime;
+  private long connectTime;
+  private byte[] sentData;
+  private byte[] receivedData;
+  private int sentDataSize;
+  private int receivedDataSize;
+  private boolean hasCorruptedData = false;
+
+
+  private static final Logger logger = LoggerFactory.getLogger(TcpSession.class);
+
+  public TcpSession (long sessionID) {
+    packetsFromSender = new ArrayList<>();
+    packetsFromReceiver = new ArrayList<>();
+
+    handshake = new TcpHandshake();
+    this.sessionID = sessionID;
+  }
+
+  /**
+   * This function adds a packet to the TCP session.
+   * @param p The Packet to be added to the session
+   */
+  public void addPacket(Packet p) {
+
+    // Only attempt to add TCP packets to session
+    if (!p.getPacketType().equalsIgnoreCase("TCP")) {
+      return;
+    }
+
+    // These variables should be consistent within a TCP session
+    if (packetCount == 0) {
+      srcIP = p.getSrc_ip();
+      dstIP = p.getDst_ip();
+
+      srcPort = p.getSrc_port();
+      dstPort = p.getDst_port();
+
+      srcMac = p.getEthernetSource();
+      dstMac = p.getEthernetDestination();
+      startTime = p.getTimestamp();
+    } else if (p.getSessionHash() != sessionID) {
+      logger.warn("Attempting to add session {} to incorrect TCP session.", sessionID);
+      return;
+    }
+
+    // Add packet to appropriate list and increment the data size counter
+    if (p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) {
+      packetsFromSender.add(p);
+      // Increment the data size counters
+      if (p.getData() != null) {
+        sentDataSize += p.getData().length;
+      }
+
+    } else {
+      packetsFromReceiver.add(p);
+      if (p.getData() != null) {
+        receivedDataSize += p.getData().length;
+      }
+    }
+
+    // Check flags if connection is not established
+    if (!handshake.isConnected()) {
+      if (p.getSynFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) {
+        // This is part 1 of the TCP session handshake
+        // The host sends the first SYN packet
+        handshake.setSyn();
+        synTime = p.getTimestamp();
+      } else if (p.getSynFlag() && p.getAckFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(dstIP.getHostAddress())) {
+        // This condition represents the second part of the TCP Handshake,
+        // where the receiver sends a frame with the SYN/ACK flags set to the originator
+        handshake.setAck();
+      } else if (p.getAckFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) {
+        // Finally, this condition represents a successful opening of a TCP session, when the originator sends a frame with only the ACK flag set.
+        // At this point we finalize the session object and clear out the flags.
+        handshake.setAck();
+        ackTime = p.getTimestamp();
+        connectTime = ackTime - synTime;
+        //handshake.setConnected(sessionID);
+      }
+    } else {
+      /* Check for flags to close connection. Closing a TCP session is more difficult than opening a session and there are
+      * a lot of ways that it can go wrong. See https://accedian.com/enterprises/blog/close-tcp-sessions-diagnose-disconnections/ for references on
+      * closing TCP sessions.
+      *
+      * To close a session correctly, there are four steps:
+      * 1.  Party A sends a frame to party B with the FIN flag
+      * 2.  Party B sends a frame with an ACK flag
+      * 3.  Party B then follows with a frame with the FIN flag set
+      * 4.  Party A then confirms with an ACK flag.
+      *
+      * Technically, the session is closed upon the first FIN flag and resources can be released at that point. However, a lot can go wrong, so to force the closing of a
+      * session, either party can send a frame with a RST flag set which forces the closing of the session.
+      */
+
+      if (p.getRstFlag()) {
+        // This is the case for a forced closed connection.  Session is immediately closed.
+        handshake.setRst();
+      } else if (p.getFinFlag()) {
+        // This is the beginning of the normal session closure procedure.  If a FIN flag has been seen, the session is basically closed even if one party continues to send
+        // data,
+        handshake.setFin();
+      } else if (handshake.getCurrentSessionState() == TcpHandshake.State.CLOSE_WAIT) {
+
+      }
+      if (p.getAckFlag() && p.getFinFlag()) {
+        handshake.setAck();
+      }
+    }
+
+    // Augment the packet counter
+    packetCount++;
+
+    if (p.isCorrupt()) {
+      hasCorruptedData = true;
+    }
+
+    // Add the start and ending time stamp.  The packets are not necessarily received in order, so we have to check the timestamps this way
+    if (p.getTimestampMicro() < startTime) {
+      startTime = p.getTimestamp();
+    }
+
+    if (p.getTimestampMicro() > endTime) {
+      endTime = p.getTimestamp();
+    }
+
+    // Close the session if the closing handshake is complete
+    if (handshake.isClosed()) {
+      closeSession();
+    }
+  }
+
+  /**
+   * This function returns true if the TCP session has been established, false if not.
+   * @return True if the session has been established, false if not.
+   */
+  public boolean connectionEstablished() {
+    return handshake.isConnected();
+  }
+
+  public boolean connectionClosed() {
+    return handshake.isClosed();
+  }
+
+  public void closeSession() {
+    logger.debug("Closing session {}", sessionID);
+    /* The sent and received bytes cannot be written until the session is closed.
+    * Upon receipt of the FIN->FIN/ACK handshake, write everything.
+    *
+    * Since it cannot be assumed that the packets were received in the correct order, we must:
+    * 1. Sort them by TCP Sequence Number
+    * 2. Write the data to the respective byte array
+    */
+
+    Collections.sort(packetsFromSender);
+    Collections.sort(packetsFromReceiver);
+
+    sentData = new byte[sentDataSize];
+    receivedData = new byte[receivedDataSize];
+
+    byte[] dataFromPacket;
+    int dataOffset = 0;
+    // Now that the lists are sorted, add packet data to the lists
+    for (int i = 0; i < packetsFromSender.size(); i++) {
+      // Get the packet;
+      Packet p = packetsFromSender.get(i);
+      dataFromPacket = p.getData();
+      if (dataFromPacket != null) {
+        for (int j = 0; j < dataFromPacket.length; j++) {
+          sentData[dataOffset] = dataFromPacket[j];
+          dataOffset++;
+        }
+      }
+    }
+
+    dataOffset = 0;
+    for (int i = 0; i < packetsFromReceiver.size(); i++) {
+      // Get the packet;
+      Packet p = packetsFromReceiver.get(i);
+      dataFromPacket = p.getData();
+      if (dataFromPacket != null) {
+        for (int j = 0; j < dataFromPacket.length; j++) {
+          receivedData[dataOffset] = dataFromPacket[j];
+          dataOffset++;
+        }
+      }
+    }
+  }
+
+  public Instant getSessionStartTime() {
+    return new Instant(startTime);
+  }
+
+  public Period getSessionDuration() {
+    return new Period(endTime - startTime);
+  }
+
+  public Period getConnectionTime() {
+    return new Period(connectTime);
+  }
+
+  public Instant getSessionEndTime() {
+    return new Instant(endTime);
+  }
+
+  public String getSrcMac() {
+    return srcMac;
+  }
+
+  public String getDstMac() {
+    return dstMac;
+  }
+
+  public String getSrcIP() {
+    return srcIP.getHostAddress();
+  }
+
+  public String getDstIP() {
+    return dstIP.getHostAddress();
+  }
+
+  public int getSrcPort() {
+    return srcPort;
+  }
+
+  public int getDstPort() {
+    return dstPort;
+  }
+
+  public long getSessionID() {
+    return sessionID;
+  }
+
+  public int getPacketCount() {
+    return packetsFromReceiver.size() + packetsFromSender.size();
+  }
+
+  public int getPacketCountFromOrigin() { return packetsFromSender.size(); }
+
+  public int getPacketCountFromRemote() { return packetsFromReceiver.size(); }
+
+  public boolean hasCorruptedData() {
+    return hasCorruptedData;
+  }
+
+  public int getDataVolumeFromOrigin() {
+    return sentData.length;
+  }
+
+  public int getDataVolumeFromRemote() {
+    return receivedData.length;
+  }
+
+  public byte[] getDataFromOriginator() {
+    return sentData;
+  }
+
+  public String getDataFromOriginatorAsString() {
+    return parseBytesToASCII(sentData);
+  }
+
+  public byte[] getDataFromRemote() {
+    return receivedData;
+  }
+
+  public String getDataFromRemoteAsString() {
+    return parseBytesToASCII(receivedData);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
index fc6e029..c1fb4db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
@@ -22,5 +22,6 @@ public enum PcapTypes {
   INTEGER,
   STRING,
   LONG,
-  TIMESTAMP
-}
\ No newline at end of file
+  TIMESTAMP,
+  DURATION
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
index d88bc97..7655d26 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
@@ -30,8 +30,10 @@ public class Schema {
 
   private final List<ColumnDto> columns = new ArrayList<>();
   private final MinorType typeMap[] = new MinorType[PcapTypes.values().length];
+  private final boolean sessionizeTCPStreams;
 
-  public Schema() {
+  public Schema(boolean sessionSchema) {
+    this.sessionizeTCPStreams = sessionSchema;
     setupStructure();
   }
 
@@ -41,36 +43,58 @@ public class Schema {
     typeMap[PcapTypes.STRING.ordinal()] = MinorType.VARCHAR;
     typeMap[PcapTypes.LONG.ordinal()] = MinorType.BIGINT;
     typeMap[PcapTypes.TIMESTAMP.ordinal()] = MinorType.TIMESTAMP;
+    typeMap[PcapTypes.DURATION.ordinal()] = MinorType.INTERVAL;
 
-    columns.add(new ColumnDto("type", PcapTypes.STRING));
-    columns.add(new ColumnDto("network", PcapTypes.INTEGER));
-    columns.add(new ColumnDto("packet_timestamp", PcapTypes.TIMESTAMP));
-    columns.add(new ColumnDto("timestamp_micro", PcapTypes.LONG));
+    // Common columns
     columns.add(new ColumnDto("src_ip", PcapTypes.STRING));
     columns.add(new ColumnDto("dst_ip", PcapTypes.STRING));
     columns.add(new ColumnDto("src_port", PcapTypes.INTEGER));
     columns.add(new ColumnDto("dst_port", PcapTypes.INTEGER));
     columns.add(new ColumnDto("src_mac_address", PcapTypes.STRING));
     columns.add(new ColumnDto("dst_mac_address", PcapTypes.STRING));
-    columns.add(new ColumnDto("tcp_session", PcapTypes.LONG));
-    columns.add(new ColumnDto("tcp_sequence", PcapTypes.INTEGER));
-    columns.add(new ColumnDto("tcp_ack", PcapTypes.BOOLEAN));
-    columns.add(new ColumnDto("tcp_flags", PcapTypes.INTEGER));
-    columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.BOOLEAN));
-    columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.BOOLEAN));
-    columns.add(new ColumnDto("tcp_flags_ece", PcapTypes.BOOLEAN ));
-    columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.BOOLEAN ));
-    columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", PcapTypes.BOOLEAN ));
-    columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.BOOLEAN ));
-    columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.BOOLEAN ));
-    columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.BOOLEAN ));
-    columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.BOOLEAN ));
-    columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.BOOLEAN ));
-    columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.BOOLEAN ));
-    columns.add(new ColumnDto("tcp_parsed_flags", PcapTypes.STRING));
-    columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER));
-    columns.add(new ColumnDto("is_corrupt", PcapTypes.BOOLEAN));
-    columns.add(new ColumnDto("data", PcapTypes.STRING));
+
+    // Columns specific for Sessionized TCP Sessions
+    if (sessionizeTCPStreams) {
+      columns.add(new ColumnDto("session_start_time", PcapTypes.TIMESTAMP));
+      columns.add(new ColumnDto("session_end_time", PcapTypes.TIMESTAMP));
+      columns.add(new ColumnDto("session_duration", PcapTypes.DURATION));
+      columns.add(new ColumnDto("total_packet_count", PcapTypes.INTEGER));
+      columns.add(new ColumnDto("data_volume_from_origin", PcapTypes.INTEGER));
+      columns.add(new ColumnDto("data_volume_from_remote", PcapTypes.INTEGER));
+      columns.add(new ColumnDto("packet_count_from_origin", PcapTypes.INTEGER));
+      columns.add(new ColumnDto("packet_count_from_remote", PcapTypes.INTEGER));
+
+      columns.add(new ColumnDto("connection_time", PcapTypes.DURATION));
+      columns.add(new ColumnDto("tcp_session", PcapTypes.LONG));
+      columns.add(new ColumnDto("is_corrupt", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("data_from_originator", PcapTypes.STRING));
+      columns.add(new ColumnDto("data_from_remote", PcapTypes.STRING));
+    } else {
+      // Columns for Regular Packets
+      columns.add(new ColumnDto("type", PcapTypes.STRING));
+      columns.add(new ColumnDto("network", PcapTypes.INTEGER));
+      columns.add(new ColumnDto("packet_timestamp", PcapTypes.TIMESTAMP));
+      columns.add(new ColumnDto("timestamp_micro", PcapTypes.LONG));
+      columns.add(new ColumnDto("tcp_session", PcapTypes.LONG));
+      columns.add(new ColumnDto("tcp_sequence", PcapTypes.INTEGER));
+      columns.add(new ColumnDto("tcp_ack", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_flags", PcapTypes.INTEGER));
+      columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_flags_ece", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("tcp_parsed_flags", PcapTypes.STRING));
+      columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER));
+      columns.add(new ColumnDto("is_corrupt", PcapTypes.BOOLEAN));
+      columns.add(new ColumnDto("data", PcapTypes.STRING));
+    }
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index f43afb7..7721dc7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -58,11 +58,13 @@ public class TestFormatPluginOptionExtractor {
           break;
         case "json":
         case "sequencefile":
-        case "pcap":
         case "pcapng":
         case "avro":
           assertEquals(d.typeName, "(type: String)", d.presentParams());
           break;
+        case "pcap":
+          assertEquals(d.typeName, "(type: String, sessionizeTCPStreams: boolean)", d.presentParams());
+          break;
         case "httpd":
           assertEquals("(type: String, logFormat: String, timestampFormat: String)", d.presentParams());
           break;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
index f0e89d9..54021d2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
@@ -22,6 +22,8 @@ import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.exec.store.pcap.decoder.Packet;
 import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataOutputStream;
@@ -35,7 +37,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class TestPcapDecoder extends BaseTestQuery {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPcapDecoder.class);
+  private static final Logger logger = LoggerFactory.getLogger(TestPcapDecoder.class);
 
   private static File bigFile;
 
@@ -61,7 +63,7 @@ public class TestPcapDecoder extends BaseTestQuery {
     int offset = 0;
 
 
-    byte[] buffer = new byte[PcapRecordReader.BUFFER_SIZE + pd.getMaxLength()];
+    byte[] buffer = new byte[PcapBatchReader.BUFFER_SIZE + pd.getMaxLength()];
     int validBytes = in.read(buffer);
     assertTrue(validBytes > 50);
 
@@ -168,7 +170,7 @@ public class TestPcapDecoder extends BaseTestQuery {
     PacketDecoder pd = new PacketDecoder(in);
     Packet p = pd.packet();
 
-    byte[] buffer = new byte[PcapRecordReader.BUFFER_SIZE + pd.getMaxLength()];
+    byte[] buffer = new byte[PcapBatchReader.BUFFER_SIZE + pd.getMaxLength()];
     int validBytes = in.read(buffer);
 
     int offset = 0;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
index 246691e..5b07423 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
@@ -29,7 +29,6 @@ import org.apache.drill.test.ClusterTest;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
 import java.time.LocalDateTime;
 import java.time.Month;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
new file mode 100644
index 0000000..8c2818d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pcap;
+
+
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.joda.time.Period;
+import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import java.time.format.DateTimeFormatter;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSessionizePCAP extends ClusterTest {
+
+  private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    PcapFormatConfig sampleConfig = new PcapFormatConfig();
+    sampleConfig.sessionizeTCPStreams = true;
+
+    cluster.defineFormat("cp", "pcap", sampleConfig);
+    dirTestWatcher.copyResourceToRoot(Paths.get("store/pcap/"));
+  }
+
+  @Test
+  public void testSessionizedStarQuery() throws Exception {
+    String sql = "SELECT * FROM cp.`/store/pcap/attack-trace.pcap` WHERE src_port=1821 AND dst_port=445";
+
+    testBuilder()
+      .sqlQuery(sql)
+      .ordered()
+      .baselineColumns("session_start_time", "session_end_time", "session_duration", "total_packet_count", "connection_time", "src_ip", "dst_ip", "src_port", "dst_port",
+        "src_mac_address", "dst_mac_address", "tcp_session", "is_corrupt", "data_from_originator", "data_from_remote", "data_volume_from_origin",
+        "data_volume_from_remote", "packet_count_from_origin", "packet_count_from_remote")
+      .baselineValues(LocalDateTime.parse("2009-04-20T03:28:28.374", formatter),
+        LocalDateTime.parse("2009-04-20T03:28:28.508", formatter),
+        Period.parse("PT0.134S"), 4,
+        Period.parse("PT0.119S"),
+        "98.114.205.102",
+        "192.150.11.111",
+        1821, 445,
+        "00:08:E2:3B:56:01",
+        "00:30:48:62:4E:4A",
+        -8791568836279708938L,
+        false,
+        "........I....>...>..........Ib...<...<..........I....>...>", "", 62,0, 3, 1)
+      .go();
+  }
+
+  @Test
+  public void testSessionizedSpecificQuery() throws Exception {
+    String sql = "SELECT session_start_time, session_end_time,session_duration, total_packet_count, connection_time, src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, " +
+      "is_corrupt, data_from_originator, data_from_remote, data_volume_from_origin, data_volume_from_remote, packet_count_from_origin, packet_count_from_remote " +
+      "FROM cp.`/store/pcap/attack-trace.pcap` WHERE src_port=1821 AND dst_port=445";
+
+    testBuilder()
+      .sqlQuery(sql)
+      .ordered()
+      .baselineColumns("session_start_time", "session_end_time", "session_duration", "total_packet_count", "connection_time", "src_ip", "dst_ip", "src_port", "dst_port",
+        "src_mac_address", "dst_mac_address", "tcp_session", "is_corrupt", "data_from_originator", "data_from_remote", "data_volume_from_origin",
+        "data_volume_from_remote", "packet_count_from_origin", "packet_count_from_remote")
+      .baselineValues(LocalDateTime.parse("2009-04-20T03:28:28.374", formatter),
+        LocalDateTime.parse("2009-04-20T03:28:28.508", formatter),
+        Period.parse("PT0.134S"), 4,
+        Period.parse("PT0.119S"),
+        "98.114.205.102",
+        "192.150.11.111",
+        1821, 445,
+        "00:08:E2:3B:56:01",
+        "00:30:48:62:4E:4A",
+        -8791568836279708938L,
+        false,
+        "........I....>...>..........Ib...<...<..........I....>...>", "", 62,0, 3, 1)
+      .go();
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "SELECT COUNT(*) FROM cp.`/store/pcap/attack-trace.pcap`";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+    assertEquals("Counts should match", 5L, cnt);
+  }
+}
diff --git a/exec/java-exec/src/test/resources/store/pcap/attack-trace.pcap b/exec/java-exec/src/test/resources/store/pcap/attack-trace.pcap
new file mode 100644
index 0000000..68e1fff
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcap/attack-trace.pcap differ