You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2021/04/26 08:19:54 UTC

[drill] branch master updated: DRILL-7828: Refactor Pcap and Pcapng format plugin (#2192)

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 360b080  DRILL-7828: Refactor Pcap and Pcapng format plugin (#2192)
360b080 is described below

commit 360b080ff04703aadfb0085ca6464fb2e85dd6d5
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Mon Apr 26 11:19:43 2021 +0300

    DRILL-7828: Refactor Pcap and Pcapng format plugin (#2192)
---
 .../drill/exec/store/pcap/PcapBatchReader.java     |  78 +----------
 .../drill/exec/store/pcap/PcapFormatUtils.java     |   2 +-
 .../drill/exec/store/pcap/decoder/Murmur128.java   |   0
 .../drill/exec/store/pcap/decoder/Packet.java      |   0
 .../exec/store/pcap/decoder/PacketConstants.java   |   0
 .../exec/store/pcap/decoder/PacketDecoder.java     |  27 +++-
 .../exec/store/pcap/decoder/TcpHandshake.java      |   0
 .../drill/exec/store/pcap/decoder/TcpSession.java  |   0
 .../drill/exec/store/pcap/dto/ColumnDto.java       |   0
 .../apache/drill/exec/store/pcap/package-info.java |   0
 .../store/pcap/plugin/BasePcapFormatPlugin.java    | 153 +++++++++++++++++++++
 .../plugin/PcapFormatConfig.java}                  |  35 +++--
 .../exec/store/pcap/plugin/PcapFormatPlugin.java   |  22 ++-
 .../plugin}/PcapngFormatConfig.java                |  57 ++------
 .../exec/store/pcap/plugin/PcapngFormatPlugin.java |  25 ++--
 .../drill/exec/store/pcap/schema/PcapTypes.java    |   0
 .../drill/exec/store/pcap/schema/Schema.java       |   0
 .../drill/exec/store/pcapng/PcapngBatchReader.java |   5 +-
 .../exec/store/pcapng/PcapngFormatPlugin.java      |  90 ------------
 .../main/resources/bootstrap-format-plugins.json   |  15 +-
 .../store/pcap/TestPcapWithPersistentStore.java    |  84 +++++++++++
 .../apache/drill/exec/store/pcap/ConcatPcap.java   |   0
 .../drill/exec/store/pcap/TestPcapDecoder.java     |   4 +-
 .../drill/exec/store/pcap/TestPcapEVFReader.java   |  11 +-
 .../exec/store/pcap/TestPcapRecordReader.java      |  41 ++++--
 .../drill/exec/store/pcap/TestSessionizePCAP.java  |  15 +-
 .../exec/store/pcapng/TestPcapngRecordReader.java  |  48 ++++++-
 .../store/pcapng/TestPcapngStatRecordReader.java   |   6 +-
 .../src/test/resources/config/oldPcapPlugins.json  |  12 ++
 .../src/test/resources}/pcap/arpWithNullIP.pcap    | Bin
 .../src/test/resources}/pcap/data-1.pcap           | Bin
 .../src/test/resources/pcap/data-2.pcap            | Bin 0 -> 1475104 bytes
 .../src/test/resources}/pcap/dataFromRemote.txt    |   0
 .../src/test/resources}/pcap/http.pcap             | Bin
 .../src/test/resources}/pcap/synscan.pcap          | Bin
 .../src/test/resources}/pcap/tcp-1.pcap            | Bin
 .../src/test/resources}/pcap/tcp-2.pcap            | Bin
 .../src/test/resources}/pcap/testv1.pcap           | Bin
 .../src/test/resources/pcapng/example.pcap         | Bin 0 -> 512 bytes
 .../src/test/resources/todo/dhcp.pcapng            | Bin 0 -> 1733 bytes
 .../src/test/resources/todo/dhcp_big_endian.pcapng | Bin 0 -> 1757 bytes
 .../test/resources/todo/dhcp_little_endian.pcapng  | Bin 0 -> 1757 bytes
 .../src/test/resources/todo/many_interfaces.pcapng | Bin 0 -> 30743 bytes
 .../apache/drill/hbase/TestHBaseTableProvider.java |   6 +-
 .../impl/scan/columns/ColumnsScanFramework.java    |   1 -
 .../physical/impl/scan/file/FileScanFramework.java |  18 ++-
 .../exec/store/dfs/easy/EasyFormatPlugin.java      |   3 +-
 .../drill/exec/store/pcap/PcapFormatConfig.java    |  83 -----------
 .../drill/exec/store/pcap/PcapFormatPlugin.java    |  88 ------------
 .../main/resources/bootstrap-storage-plugins.json  |   9 --
 .../org/apache/drill/exec/TestWithZookeeper.java   |  17 +++
 .../drill/exec/store/FormatPluginSerDeTest.java    |  10 --
 .../drill/exec/store/sys/TestPStoreProviders.java  |  15 --
 .../resources/plugins/mock-plugin-upgrade.json     |   6 -
 .../src/test/resources/store/pcap/data-2.pcap      | Bin 1167896 -> 0 bytes
 55 files changed, 481 insertions(+), 505 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
similarity index 95%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
index 83e2115..fd7fef9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.store.pcap.decoder.Packet;
 import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
 import org.apache.drill.exec.store.pcap.decoder.TcpSession;
 import org.apache.drill.exec.store.pcap.schema.Schema;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.hadoop.mapred.FileSplit;
 import org.slf4j.Logger;
@@ -48,126 +49,61 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
   private static final Logger logger = LoggerFactory.getLogger(PcapBatchReader.class);
 
   private FileSplit split;
-
   private PacketDecoder decoder;
-
   private InputStream fsStream;
-
   private RowSetLoader rowWriter;
-
   private int validBytes;
-
   private byte[] buffer;
-
   private int offset;
-
   private ScalarWriter typeWriter;
-
   private ScalarWriter timestampWriter;
-
   private ScalarWriter timestampMicroWriter;
-
   private ScalarWriter networkWriter;
-
   private ScalarWriter srcMacAddressWriter;
-
   private ScalarWriter dstMacAddressWriter;
-
   private ScalarWriter dstIPWriter;
-
   private ScalarWriter srcIPWriter;
-
   private ScalarWriter srcPortWriter;
-
   private ScalarWriter dstPortWriter;
-
   private ScalarWriter packetLengthWriter;
-
   private ScalarWriter tcpSessionWriter;
-
   private ScalarWriter tcpSequenceWriter;
-
   private ScalarWriter tcpAckNumberWriter;
-
   private ScalarWriter tcpFlagsWriter;
-
   private ScalarWriter tcpParsedFlagsWriter;
-
   private ScalarWriter tcpNsWriter;
-
   private ScalarWriter tcpCwrWriter;
-
   private ScalarWriter tcpEceWriter;
-
   private ScalarWriter tcpFlagsEceEcnCapableWriter;
-
   private ScalarWriter tcpFlagsCongestionWriter;
-
   private ScalarWriter tcpUrgWriter;
-
   private ScalarWriter tcpAckWriter;
-
   private ScalarWriter tcpPshWriter;
-
   private ScalarWriter tcpRstWriter;
-
   private ScalarWriter tcpSynWriter;
-
   private ScalarWriter tcpFinWriter;
-
   private ScalarWriter dataWriter;
-
   private ScalarWriter isCorruptWriter;
-
-  private final PcapReaderConfig readerConfig;
-
-
+  private final PcapFormatConfig readerConfig;
   // Writers for TCP Sessions
   private ScalarWriter sessionStartTimeWriter;
-
   private ScalarWriter sessionEndTimeWriter;
-
   private ScalarWriter sessionDurationWriter;
-
   private ScalarWriter connectionTimeWriter;
-
   private ScalarWriter packetCountWriter;
-
   private ScalarWriter originPacketCounterWriter;
-
   private ScalarWriter remotePacketCounterWriter;
-
   private ScalarWriter originDataVolumeWriter;
-
   private ScalarWriter remoteDataVolumeWriter;
-
   private ScalarWriter hostDataWriter;
-
   private ScalarWriter remoteDataWriter;
-
   private final int maxRecords;
-
   private Map<Long, TcpSession> sessionQueue;
 
 
-  public static class PcapReaderConfig {
-
-    protected final PcapFormatPlugin plugin;
-
-    public boolean sessionizeTCPStreams;
-
-    private final PcapFormatConfig config;
-
-    public PcapReaderConfig(PcapFormatPlugin plugin) {
-      this.plugin = plugin;
-      this.config = plugin.getConfig();
-      this.sessionizeTCPStreams = config.getSessionizeTCPStreams();
-    }
-  }
-
-  public PcapBatchReader(PcapReaderConfig readerConfig, int maxRecords) {
+  public PcapBatchReader(PcapFormatConfig readerConfig, int maxRecords) {
     this.readerConfig = readerConfig;
-    if (readerConfig.sessionizeTCPStreams) {
+    if (readerConfig.getSessionizeTCPStreams()) {
       sessionQueue = new HashMap<>();
     }
     this.maxRecords = maxRecords;
@@ -178,7 +114,7 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
     split = negotiator.split();
     openFile(negotiator);
     SchemaBuilder builder = new SchemaBuilder();
-    Schema pcapSchema = new Schema(readerConfig.sessionizeTCPStreams);
+    Schema pcapSchema = new Schema(readerConfig.getSessionizeTCPStreams());
     TupleMetadata schema = pcapSchema.buildSchema(builder);
     negotiator.tableSchema(schema, false);
     ResultSetLoader loader = negotiator.build();
@@ -238,7 +174,7 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
   }
 
   private void populateColumnWriters(RowSetLoader rowWriter) {
-    if (readerConfig.sessionizeTCPStreams) {
+    if (readerConfig.getSessionizeTCPStreams()) {
       srcMacAddressWriter = rowWriter.scalar("src_mac_address");
       dstMacAddressWriter = rowWriter.scalar("dst_mac_address");
       dstIPWriter = rowWriter.scalar("dst_ip");
@@ -323,7 +259,7 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
 
     // If we are resessionizing the TCP Stream, add the packet to the stream
-    if (readerConfig.sessionizeTCPStreams) {
+    if (readerConfig.getSessionizeTCPStreams()) {
       // If the session has not been seen before, add it to the queue
       long sessionID = packet.getSessionHash();
       if (!sessionQueue.containsKey(sessionID)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
similarity index 97%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
index eb9ea05..6c23159 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
@@ -40,7 +40,7 @@ public class PcapFormatUtils {
 
   /**
    *
-   * @param byteOrder true for forward file order, false fore revers file order
+   * @param byteOrder true for forward file order, false for reverse file order
    * @param buf byte buffer
    * @param offset buffer offset
    * @return short value as int of specific bytes from buffer
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
similarity index 87%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
index 8e6d867..977e465 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
@@ -59,12 +59,15 @@ public class PacketDecoder {
   private static final int GLOBAL_HEADER_SIZE = 24;
   private static final int PCAP_MAGIC_LITTLE_ENDIAN = 0xD4C3B2A1;
   private static final int PCAP_MAGIC_NUMBER = 0xA1B2C3D4;
+  private static final int PCAPNG_MAGIC_LITTLE_ENDIAN = 0x4D3C2B1A;
+  private static final int PCAPNG_MAGIC_NUMBER = 0x0A0D0D0A;
 
   private static final Logger logger = LoggerFactory.getLogger(PacketDecoder.class);
 
   private final int maxLength;
   private final int network;
   private boolean bigEndian;
+  private FileFormat fileFormat;
 
   private InputStream input;
 
@@ -78,16 +81,28 @@ public class PacketDecoder {
     switch (getInt(globalHeader, 0)) {
       case PCAP_MAGIC_NUMBER:
         bigEndian = true;
+        fileFormat = FileFormat.PCAP;
         break;
       case PCAP_MAGIC_LITTLE_ENDIAN:
         bigEndian = false;
+        fileFormat = FileFormat.PCAP;
+        break;
+      case PCAPNG_MAGIC_NUMBER:
+        bigEndian = true;
+        fileFormat = FileFormat.PCAPNG;
+        break;
+      case PCAPNG_MAGIC_LITTLE_ENDIAN:
+        bigEndian = false;
+        fileFormat = FileFormat.PCAPNG;
         break;
       default:
         //noinspection ConstantConditions
         Preconditions.checkState(false,
             String.format("Bad magic number = %08x", getIntFileOrder(bigEndian, globalHeader, 0)));
     }
-    Preconditions.checkState(getShortFileOrder(bigEndian, globalHeader, 4) == 2, "Wanted major version == 2");
+    if(fileFormat == FileFormat.PCAP) {
+      Preconditions.checkState(getShortFileOrder(bigEndian, globalHeader, 4) == 2, "Wanted major version == 2");
+    } // todo: pcapng major version == 1 precondition
     maxLength = getIntFileOrder(bigEndian, globalHeader, 16);
     network = getIntFileOrder(bigEndian, globalHeader, 20);
   }
@@ -116,6 +131,10 @@ public class PacketDecoder {
     return bigEndian;
   }
 
+  public FileFormat getFileFormat() {
+    return fileFormat;
+  }
+
   public Packet nextPacket() throws IOException {
     Packet r = new Packet();
     if (r.readPcap(input, bigEndian, maxLength)) {
@@ -124,4 +143,10 @@ public class PacketDecoder {
       return null;
     }
   }
+
+  public enum FileFormat {
+    PCAP,
+    PCAPNG,
+    UNKNOWN
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/package-info.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/package-info.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/package-info.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/package-info.java
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
new file mode 100644
index 0000000..4052245
--- /dev/null
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap.plugin;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.store.pcap.PcapBatchReader;
+import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
+import org.apache.drill.exec.store.pcapng.PcapngBatchReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public abstract class BasePcapFormatPlugin<T extends PcapFormatConfig> extends EasyFormatPlugin<T> {
+
+  static final Logger logger = LoggerFactory.getLogger(ManagedScanFramework.class);
+  private static PacketDecoder.FileFormat fileFormat = PacketDecoder.FileFormat.UNKNOWN;
+
+  public BasePcapFormatPlugin(String name,
+                              DrillbitContext context,
+                              Configuration fsConf,
+                              StoragePluginConfig storageConfig,
+                              T formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+  }
+
+  private static EasyFormatConfig easyConfig(Configuration fsConf, PcapFormatConfig pluginConfig) {
+    return EasyFormatConfig.builder()
+        .readable(true)
+        .writable(false)
+        .blockSplittable(false)
+        .compressible(true)
+        .extensions(pluginConfig.getExtensions())
+        .fsConf(fsConf)
+        .useEnhancedScan(true)
+        .supportsLimitPushdown(true)
+        .supportsProjectPushdown(true)
+        .defaultName(PcapFormatConfig.NAME)
+        .build();
+  }
+
+  private static class PcapReaderFactory extends FileReaderFactory {
+
+    private final PcapFormatConfig config;
+    private final EasySubScan scan;
+
+    public PcapReaderFactory(PcapFormatConfig config, EasySubScan scan) {
+      this.config = config;
+      this.scan = scan;
+    }
+
+    /**
+     * Reader creator. If file format can't be detected try to use default PCAP format plugin
+     *
+     * @return PCAP or PCAPNG batch reader
+     */
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      if (fileFramework().isPresent()) { // todo: can be simplified with java9 ifPresentOrElse
+        Path path = scan.getWorkUnits().stream()
+                .findFirst()
+                .orElseThrow(() -> UserException.
+                        dataReadError()
+                        .addContext("There are no files for scanning")
+                        .build(logger))
+                .getPath();
+        fileFormat = getFileFormat(fileFramework().get().fileSystem(), path);
+        if (config.getExtensions().stream()
+                .noneMatch(f -> f.equals(fileFormat.name().toLowerCase()))) {
+          logger.error("File format {} is not within plugin extensions: {}. Trying to use default PCAP format plugin to " +
+                  "read the file", fileFormat, config.getExtensions());
+        }
+      } else {
+        logger.error("It is not possible to detect file format, because the File Framework is not initialized. " +
+                "Trying to use default PCAP format plugin to read the file");
+      }
+      return createReader(scan, config);
+    }
+  }
+
+  @Override
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) {
+    return createReader(scan, formatConfig);
+  }
+
+  private static ManagedReader<? extends FileSchemaNegotiator> createReader(EasySubScan scan, PcapFormatConfig config) {
+    switch(fileFormat) {
+      case PCAPNG: return new PcapngBatchReader(config, scan);
+      case PCAP:
+      case UNKNOWN:
+      default: return new PcapBatchReader(config, scan.getMaxRecords());
+    }
+  }
+
+  @Override
+  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+    FileScanBuilder builder = new FileScanBuilder();
+    builder.setReaderFactory(new PcapReaderFactory(formatConfig, scan));
+
+    initScanBuilder(builder, scan);
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+    return builder;
+  }
+
+  /**
+   * Helper method to detect PCAP or PCAPNG file format based on file Magic Number
+   *
+   * @param dfs for obtaining InputStream
+   * @return PCAP/PCAPNG file format
+   */
+  private static PacketDecoder.FileFormat getFileFormat(DrillFileSystem dfs, Path path) {
+    try (InputStream inputStream = dfs.openPossiblyCompressedStream(path)) {
+      PacketDecoder decoder = new PacketDecoder(inputStream);
+      return decoder.getFileFormat();
+    } catch (IOException io) {
+      throw UserException
+              .dataReadError(io)
+              .addContext("File name:", path.toString())
+              .build(logger);
+    }
+  }
+}
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatConfig.java
similarity index 59%
copy from contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
copy to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatConfig.java
index 7210f93..b5f8d53 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatConfig.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.pcapng;
+package org.apache.drill.exec.store.pcap.plugin;
 
 import java.util.List;
 import java.util.Objects;
@@ -29,18 +29,23 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
-@JsonTypeName(PcapngFormatConfig.NAME)
+@JsonTypeName(PcapFormatConfig.NAME)
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-public class PcapngFormatConfig implements FormatPluginConfig {
+public class PcapFormatConfig implements FormatPluginConfig {
+  private static final List<String> DEFAULT_EXTNS = ImmutableList.of("pcap");
 
-  public static final String NAME = "pcapng";
+  public static final String NAME = "pcap";
   private final List<String> extensions;
   private final boolean stat;
+  private final boolean sessionizeTCPStreams;
 
   @JsonCreator
-  public PcapngFormatConfig(@JsonProperty("extensions") List<String> extensions, @JsonProperty("stat") boolean stat) {
-    this.extensions = extensions == null ? ImmutableList.of(PcapngFormatConfig.NAME) : ImmutableList.copyOf(extensions);
+  public PcapFormatConfig(@JsonProperty("extensions") List<String> extensions,
+                          @JsonProperty("stat") boolean stat,
+                          @JsonProperty("sessionizeTCPStreams") Boolean sessionizeTCPStreams) {
+    this.extensions = extensions == null ? DEFAULT_EXTNS : ImmutableList.copyOf(extensions);
     this.stat = stat;
+    this.sessionizeTCPStreams = sessionizeTCPStreams != null && sessionizeTCPStreams;
   }
 
   @JsonProperty("extensions")
@@ -53,6 +58,11 @@ public class PcapngFormatConfig implements FormatPluginConfig {
     return this.stat;
   }
 
+  @JsonProperty("sessionizeTCPStreams")
+  public boolean getSessionizeTCPStreams() {
+    return sessionizeTCPStreams;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -61,17 +71,22 @@ public class PcapngFormatConfig implements FormatPluginConfig {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    PcapngFormatConfig that = (PcapngFormatConfig) o;
-    return Objects.equals(extensions, that.extensions) && Objects.equals(stat, that.getStat());
+    PcapFormatConfig that = (PcapFormatConfig) o;
+    return Objects.equals(extensions, that.extensions) && Objects.equals(stat, that.getStat()) &&
+            Objects.equals(sessionizeTCPStreams, that.sessionizeTCPStreams);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(extensions, stat);
+    return Objects.hash(extensions, stat, sessionizeTCPStreams);
   }
 
   @Override
   public String toString() {
-    return new PlanStringBuilder(this).field("extensions", extensions).field("stat", stat).toString();
+    return new PlanStringBuilder(this)
+            .field("extensions", extensions)
+            .field("stat", stat)
+            .field("sessionizeTCPStreams", sessionizeTCPStreams)
+            .toString();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatPlugin.java
similarity index 55%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatPlugin.java
index a078a3e..34ba5c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatPlugin.java
@@ -15,20 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.sys.local;
+package org.apache.drill.exec.store.pcap.plugin;
 
-import org.apache.drill.exec.exception.StoreException;
-import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
-import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.hadoop.conf.Configuration;
 
-/**
- * Kept for possible references to old class name in configuration.
- *
- * @deprecated will be removed in 1.7
- *    use {@link org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider} instead.
- */
-public class LocalPStoreProvider extends LocalPersistentStoreProvider {
-  public LocalPStoreProvider(PersistentStoreRegistry registry) throws StoreException {
-    super(registry);
+public class PcapFormatPlugin extends BasePcapFormatPlugin<PcapFormatConfig> {
+
+  public PcapFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+                          StoragePluginConfig storageConfig, PcapFormatConfig formatConfig) {
+    super(name, context, fsConf, storageConfig, formatConfig);
   }
 }
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatConfig.java
similarity index 51%
rename from contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatConfig.java
index 7210f93..082840a 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatConfig.java
@@ -15,63 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.pcapng;
-
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+package org.apache.drill.exec.store.pcap.plugin;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.util.List;
 
 @JsonTypeName(PcapngFormatConfig.NAME)
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-public class PcapngFormatConfig implements FormatPluginConfig {
-
+@Deprecated // for backward compatibility
+public class  PcapngFormatConfig extends PcapFormatConfig {
+  private static final List<String> DEFAULT_EXTNS = ImmutableList.of("pcapng");
   public static final String NAME = "pcapng";
-  private final List<String> extensions;
-  private final boolean stat;
 
   @JsonCreator
-  public PcapngFormatConfig(@JsonProperty("extensions") List<String> extensions, @JsonProperty("stat") boolean stat) {
-    this.extensions = extensions == null ? ImmutableList.of(PcapngFormatConfig.NAME) : ImmutableList.copyOf(extensions);
-    this.stat = stat;
-  }
-
-  @JsonProperty("extensions")
-  public List<String> getExtensions() {
-    return extensions;
-  }
-
-  @JsonProperty("stat")
-  public boolean getStat() {
-    return this.stat;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    PcapngFormatConfig that = (PcapngFormatConfig) o;
-    return Objects.equals(extensions, that.extensions) && Objects.equals(stat, that.getStat());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(extensions, stat);
-  }
-
-  @Override
-  public String toString() {
-    return new PlanStringBuilder(this).field("extensions", extensions).field("stat", stat).toString();
+  public PcapngFormatConfig(@JsonProperty("extensions") List<String> extensions,
+                            @JsonProperty("stat") boolean stat) {
+    super(extensions == null ? DEFAULT_EXTNS : ImmutableList.copyOf(extensions), stat, null);
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatPlugin.java
similarity index 58%
copy from exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
copy to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatPlugin.java
index a55c7c3..a57e429 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatPlugin.java
@@ -15,24 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec;
+package org.apache.drill.exec.store.pcap.plugin;
 
-import org.junit.After;
-import org.junit.Before;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.hadoop.conf.Configuration;
 
-public class TestWithZookeeper extends ExecTest {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestWithZookeeper.class);
+@Deprecated // for backward compatibility
+public class PcapngFormatPlugin extends BasePcapFormatPlugin<PcapngFormatConfig> {
 
-  protected ZookeeperHelper zkHelper;
-
-  @Before
-  public void setUp() throws Exception {
-    zkHelper = new ZookeeperHelper();
-    zkHelper.startZookeeper(1);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    zkHelper.stopZookeeper();
+  public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+                            StoragePluginConfig storageConfig, PcapngFormatConfig formatConfig) {
+    super(name, context, fsConf, storageConfig, formatConfig);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
index eeabebf..e8367c5 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
 import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.hadoop.fs.Path;
@@ -54,7 +55,7 @@ public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   private static final Logger logger = LoggerFactory.getLogger(PcapngBatchReader.class);
 
-  private final PcapngFormatConfig config;
+  private final PcapFormatConfig config;
   private final EasySubScan scan;
   private final int maxRecords;
   private CustomErrorContext errorContext;
@@ -66,7 +67,7 @@ public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> {
   private InputStream in;
   private Path path;
 
-  public PcapngBatchReader(final PcapngFormatConfig config, final EasySubScan scan) {
+  public PcapngBatchReader(final PcapFormatConfig config, final EasySubScan scan) {
     this.config = config;
     this.scan = scan;
     this.maxRecords = scan.getMaxRecords();
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
deleted file mode 100644
index 0cccd6b..0000000
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.hadoop.conf.Configuration;
-
-public class PcapngFormatPlugin extends EasyFormatPlugin<PcapngFormatConfig> {
-
-  public PcapngFormatPlugin(String name,
-                            DrillbitContext context,
-                            Configuration fsConf,
-                            StoragePluginConfig storageConfig,
-                            PcapngFormatConfig formatConfig) {
-    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
-  }
-
-  private static EasyFormatConfig easyConfig(Configuration fsConf, PcapngFormatConfig pluginConfig) {
-    return EasyFormatConfig.builder()
-        .readable(true)
-        .writable(false)
-        .blockSplittable(false)
-        .compressible(true)
-        .extensions(pluginConfig.getExtensions())
-        .fsConf(fsConf)
-        .useEnhancedScan(true)
-        .supportsLimitPushdown(true)
-        .supportsProjectPushdown(true)
-        .defaultName(PcapngFormatConfig.NAME)
-        .build();
-  }
-
-  private static class PcapngReaderFactory extends FileReaderFactory {
-
-    private final PcapngFormatConfig config;
-    private final EasySubScan scan;
-
-    public PcapngReaderFactory(PcapngFormatConfig config, EasySubScan scan) {
-      this.config = config;
-      this.scan = scan;
-    }
-
-    @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      return new PcapngBatchReader(config, scan);
-    }
-  }
-
-  @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options)
-      throws ExecutionSetupException {
-    return new PcapngBatchReader(formatConfig, scan);
-  }
-
-  @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException {
-    FileScanBuilder builder = new FileScanBuilder();
-    builder.setReaderFactory(new PcapngReaderFactory(formatConfig, scan));
-
-    initScanBuilder(builder, scan);
-    builder.nullType(Types.optional(MinorType.VARCHAR));
-    return builder;
-  }
-}
diff --git a/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json b/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json
index 60b04d1..550e6e5 100644
--- a/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json
+++ b/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json
@@ -3,8 +3,9 @@
     "dfs": {
       "type": "file",
       "formats": {
-        "pcapng": {
-          "type": "pcapng",
+        "pcap": {
+          "type": "pcap",
+          "extensions": ["pcap", "pcapng"],
           "stat" : false
         }
       }
@@ -12,8 +13,9 @@
     "cp": {
       "type": "file",
       "formats": {
-        "pcapng": {
-          "type": "pcapng",
+        "pcap": {
+          "type": "pcap",
+          "extensions": ["pcap", "pcapng"],
           "stat" : false
         }
       }
@@ -21,8 +23,9 @@
     "s3": {
       "type": "file",
       "formats": {
-        "pcapng": {
-          "type": "pcapng",
+        "pcap": {
+          "type": "pcap",
+          "extensions": ["pcap", "pcapng"],
           "stat" : false
         }
       }
diff --git a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/persistent/store/pcap/TestPcapWithPersistentStore.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/persistent/store/pcap/TestPcapWithPersistentStore.java
new file mode 100644
index 0000000..1328455
--- /dev/null
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/persistent/store/pcap/TestPcapWithPersistentStore.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.persistent.store.pcap;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.TestWithZookeeper;
+import org.apache.drill.exec.coord.zk.PathUtils;
+import org.apache.drill.exec.coord.zk.ZookeeperClient;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
+import org.apache.drill.exec.store.pcap.plugin.PcapngFormatConfig;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore;
+import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
+import org.apache.zookeeper.CreateMode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestPcapWithPersistentStore extends TestWithZookeeper {
+    /**
+     * DRILL-7828
+     * Note: If this test breaks you are probably breaking backward and forward compatibility. Verify with the community
+     * that breaking compatibility is acceptable and planned for.
+     */
+    @Test
+    public void pcapPluginBackwardCompatabilityTest() throws Exception {
+        final String oldPlugin = "oldFormatPlugin";
+
+        try (CuratorFramework curator = createCurator()) {
+            curator.start();
+            ObjectMapper objectMapper = new ObjectMapper();
+            objectMapper.registerSubtypes(PcapFormatConfig.class, PcapngFormatConfig.class);
+            PersistentStoreConfig<FileSystemConfig> storeConfig =
+                    PersistentStoreConfig.newJacksonBuilder(objectMapper, FileSystemConfig.class).name("type").build();
+
+
+            try (ZookeeperClient zkClient = new ZookeeperClient(curator,
+                    PathUtils.join("/", storeConfig.getName()), CreateMode.PERSISTENT)) {
+                zkClient.start();
+                String oldFormatPlugin = DrillFileUtils.getResourceAsString("/config/oldPcapPlugins.json");
+                zkClient.put(oldPlugin, oldFormatPlugin.getBytes(), null);
+            }
+
+            try (ZookeeperPersistentStoreProvider provider =
+                         new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator)) {
+                PersistentStore<FileSystemConfig> store = provider.getOrCreateStore(storeConfig);
+                assertTrue(store instanceof ZookeeperPersistentStore);
+
+                FileSystemConfig oldPluginConfig = ((ZookeeperPersistentStore<FileSystemConfig>)store).get(oldPlugin, null);
+                Map<String, FormatPluginConfig> formats = oldPluginConfig.getFormats();
+                Assert.assertEquals(formats.keySet(), ImmutableSet.of("pcap", "pcapng"));
+                PcapFormatConfig pcap = (PcapFormatConfig) formats.get("pcap");
+                PcapngFormatConfig pcapng = (PcapngFormatConfig) formats.get("pcapng");
+                Assert.assertEquals(pcap.getExtensions(), ImmutableList.of("pcap"));
+                assertTrue(pcapng.getStat());
+            }
+        }
+    }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java
similarity index 100%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java
rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
similarity index 98%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
index 1a29902..d171c31 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
@@ -57,7 +57,7 @@ public class TestPcapDecoder extends BaseTestQuery {
 
   @Test
   public void testBasics() throws IOException {
-    InputStream in = Resources.getResource("store/pcap/tcp-2.pcap").openStream();
+    InputStream in = Resources.getResource("pcap/tcp-2.pcap").openStream();
     PacketDecoder pd = new PacketDecoder(in);
     Packet p = pd.packet();
     int offset = 0;
@@ -226,7 +226,7 @@ public class TestPcapDecoder extends BaseTestQuery {
         // might be faster to keep this open and rewind each time, but
         // that is hard to do with a resource, especially if it comes
         // from the class path instead of files.
-        try (InputStream in = Resources.getResource("store/pcap/tcp-2.pcap").openStream()) {
+        try (InputStream in = Resources.getResource("pcap/tcp-2.pcap").openStream()) {
           ConcatPcap.copy(first, in, out);
         }
         first = false;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
similarity index 92%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
index 5796fa8..9a27276 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
@@ -19,6 +19,7 @@
 package org.apache.drill.exec.store.pcap;
 
 import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.junit.BeforeClass;
@@ -34,12 +35,12 @@ public class TestPcapEVFReader extends ClusterTest {
   @BeforeClass
   public static void setup() throws Exception {
     ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
-    cluster.defineFormat("cp", "sample", new PcapFormatConfig(null, null));
+    cluster.defineFormat("cp", "sample", new PcapFormatConfig(null, true, false));
   }
 
   @Test
   public void testStarQuery() throws Exception {
-    String sql = "SELECT * FROM cp.`store/pcap/synscan.pcap` LIMIT 1";
+    String sql = "SELECT * FROM cp.`pcap/synscan.pcap` LIMIT 1";
 
     testBuilder()
       .sqlQuery(sql)
@@ -58,7 +59,7 @@ public class TestPcapEVFReader extends ClusterTest {
       "packet_length, tcp_session, " +
       "tcp_sequence, tcp_ack, tcp_flags," +
       " tcp_parsed_flags, tcp_flags_ns, tcp_flags_cwr, tcp_flags_ece, tcp_flags_ece_ecn_capable, tcp_flags_ece_congestion_experienced, tcp_flags_urg, tcp_flags_ack, tcp_flags_psh, tcp_flags_rst, tcp_flags_syn," +
-      " tcp_flags_fin, data, is_corrupt FROM cp.`store/pcap/synscan.pcap` LIMIT 1";
+      " tcp_flags_fin, data, is_corrupt FROM cp.`pcap/synscan.pcap` LIMIT 1";
 
     testBuilder()
       .sqlQuery(sql)
@@ -72,7 +73,7 @@ public class TestPcapEVFReader extends ClusterTest {
 
   @Test
   public void testAggregateQuery() throws Exception {
-    String sql = "SELECT is_corrupt, COUNT(*) as packet_count FROM cp.`store/pcap/testv1.pcap` GROUP BY is_corrupt ORDER BY packet_count DESC";
+    String sql = "SELECT is_corrupt, COUNT(*) as packet_count FROM cp.`pcap/testv1.pcap` GROUP BY is_corrupt ORDER BY packet_count DESC";
 
     testBuilder()
       .sqlQuery(sql)
@@ -85,7 +86,7 @@ public class TestPcapEVFReader extends ClusterTest {
 
   @Test
   public void testArpPcapFile() throws Exception {
-    String sql = "SELECT src_ip, dst_ip FROM cp.`store/pcap/arpWithNullIP.pcap` WHERE src_port=1";
+    String sql = "SELECT src_ip, dst_ip FROM cp.`pcap/arpWithNullIP.pcap` WHERE src_port=1";
     testBuilder()
       .sqlQuery(sql)
       .ordered()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
similarity index 69%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
index e1a71b6..43fc3ff3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.pcap;
 
+import org.apache.drill.PlanTestBase;
 import org.apache.drill.exec.store.pcap.decoder.Packet;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
@@ -31,42 +32,42 @@ import static org.junit.Assert.assertEquals;
 public class TestPcapRecordReader extends BaseTestQuery {
   @BeforeClass
   public static void setupTestFiles() {
-    dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcap"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("pcap"));
   }
 
   @Test
   public void testStarQuery() throws Exception {
-    runSQLVerifyCount("select * from dfs.`store/pcap/tcp-1.pcap`", 16);
-    runSQLVerifyCount("select distinct DST_IP from dfs.`store/pcap/tcp-1.pcap`", 1);
-    runSQLVerifyCount("select distinct DsT_IP from dfs.`store/pcap/tcp-1.pcap`", 1);
-    runSQLVerifyCount("select distinct dst_ip from dfs.`store/pcap/tcp-1.pcap`", 1);
+    runSQLVerifyCount("select * from dfs.`pcap/tcp-1.pcap`", 16);
+    runSQLVerifyCount("select distinct DST_IP from dfs.`pcap/tcp-1.pcap`", 1);
+    runSQLVerifyCount("select distinct DsT_IP from dfs.`pcap/tcp-1.pcap`", 1);
+    runSQLVerifyCount("select distinct dst_ip from dfs.`pcap/tcp-1.pcap`", 1);
   }
 
   @Test
   public void testCorruptPCAPQuery() throws Exception {
-    runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap`", 7000);
+    runSQLVerifyCount("select * from dfs.`pcap/testv1.pcap`", 7000);
   }
 
   @Test
   public void testTrueCorruptPCAPQuery() throws Exception {
-    runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap` WHERE is_corrupt=true", 16);
+    runSQLVerifyCount("select * from dfs.`pcap/testv1.pcap` WHERE is_corrupt=true", 16);
   }
 
   @Test
   public void testNotCorruptPCAPQuery() throws Exception {
-    runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap` WHERE is_corrupt=false", 6984);
+    runSQLVerifyCount("select * from dfs.`pcap/testv1.pcap` WHERE is_corrupt=false", 6984);
   }
 
   @Test
   public void testCountQuery() throws Exception {
-    runSQLVerifyCount("select count(*) from dfs.`store/pcap/tcp-1.pcap`", 1);
-    runSQLVerifyCount("select count(*) from dfs.`store/pcap/tcp-2.pcap`", 1);
+    runSQLVerifyCount("select count(*) from dfs.`pcap/tcp-1.pcap`", 1);
+    runSQLVerifyCount("select count(*) from dfs.`pcap/tcp-2.pcap`", 1);
   }
 
   @Test
   public void testDistinctQuery() throws Exception {
     // omit data field from distinct count for now
-    runSQLVerifyCount("select distinct type, network, `timestamp`, src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, packet_length from dfs.`store/pcap/tcp-1.pcap`", 1);
+    runSQLVerifyCount("select distinct type, network, `timestamp`, src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, packet_length from dfs.`pcap/tcp-1.pcap`", 1);
   }
 
   @Test
@@ -86,7 +87,17 @@ public class TestPcapRecordReader extends BaseTestQuery {
 
   @Test
   public void checkFlags() throws Exception {
-    runSQLVerifyCount("select tcp_session, tcp_ack, tcp_flags from dfs.`store/pcap/synscan.pcap`", 2011);
+    runSQLVerifyCount("select tcp_session, tcp_ack, tcp_flags from dfs.`pcap/synscan.pcap`", 2011);
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String path = "pcap/tcp-1.pcap";
+    dirTestWatcher.copyResourceToRoot(Paths.get(path));
+    testPhysicalPlanSubmission(
+            String.format("select * from dfs.`%s`", path),
+            String.format("select * from table(dfs.`%s`(type=>'pcap'))", path)
+    );
   }
 
   private void runSQLVerifyCount(String sql, int expectedRowCount) throws Exception {
@@ -106,4 +117,10 @@ public class TestPcapRecordReader extends BaseTestQuery {
     }
     assertEquals(expectedRowCount, count);
   }
+
+  private void testPhysicalPlanSubmission(String...queries) throws Exception {
+    for (String query : queries) {
+      PlanTestBase.testPhysicalPlanExecutionBasedOnQuery(query);
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
similarity index 95%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
index addc1a1..001576d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.QueryBuilder;
@@ -43,15 +44,15 @@ public class TestSessionizePCAP extends ClusterTest {
   public static void setup() throws Exception {
     ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
 
-    PcapFormatConfig sampleConfig = new PcapFormatConfig(null, true);
+    PcapFormatConfig sampleConfig = new PcapFormatConfig(null, true, true);
     cluster.defineFormat("cp", "pcap", sampleConfig);
-    dirTestWatcher.copyResourceToRoot(Paths.get("store/pcap/"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("pcap/"));
   }
 
   @Test
   public void testSessionizedStarQuery() throws Exception {
-    String sql = "SELECT * FROM cp.`/store/pcap/http.pcap`";
-    String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/store/pcap/dataFromRemote.txt");
+    String sql = "SELECT * FROM cp.`/pcap/http.pcap`";
+    String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/pcap/dataFromRemote.txt");
 
     QueryBuilder q = client.queryBuilder().sql(sql);
     RowSet results = q.rowSet();
@@ -104,9 +105,9 @@ public class TestSessionizePCAP extends ClusterTest {
     String sql = "SELECT src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address," +
       "session_start_time, session_end_time, session_duration, total_packet_count, data_volume_from_origin, data_volume_from_remote," +
       "packet_count_from_origin, packet_count_from_remote, connection_time, tcp_session, is_corrupt, data_from_originator, data_from_remote " +
-      "FROM cp.`/store/pcap/http.pcap`";
+      "FROM cp.`/pcap/http.pcap`";
 
-    String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/store/pcap/dataFromRemote.txt");
+    String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/pcap/dataFromRemote.txt");
 
     QueryBuilder q = client.queryBuilder().sql(sql);
     RowSet results = q.rowSet();
@@ -156,7 +157,7 @@ public class TestSessionizePCAP extends ClusterTest {
 
   @Test
   public void testSerDe() throws Exception {
-    String sql = "SELECT COUNT(*) FROM cp.`/store/pcap/http.pcap`";
+    String sql = "SELECT COUNT(*) FROM cp.`/pcap/http.pcap`";
     String plan = queryBuilder().sql(sql).explainJson();
     long cnt = queryBuilder().physical(plan).singletonLong();
     assertEquals("Counts should match", 1L, cnt);
diff --git a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
index b069432..ceb76bf 100644
--- a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.drill.test.QueryBuilder;
 import org.apache.drill.test.QueryTestUtil;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -45,6 +46,7 @@ public class TestPcapngRecordReader extends ClusterTest {
   public static void setup() throws Exception {
     ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
     dirTestWatcher.copyResourceToRoot(Paths.get("pcapng/"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("todo/"));
   }
 
   @Test
@@ -190,6 +192,20 @@ public class TestPcapngRecordReader extends ClusterTest {
   }
 
   @Test
+  @Ignore // todo: infinite loop with current PcapNGReader
+  public void testPcapNG() throws Exception {
+//    String sql = "select * from dfs.`todo/dhcp_big_endian.pcapng` limit 1"; // Bad magic number = 000a0a0a
+//    String sql = "select * from dfs.`todo/dhcp_little_endian.pcapng` limit 1"; // Bad magic number = 1c0a0a0a
+//    String sql = "select * from dfs.`todo/many_interfaces.pcapng` limit 1"; // Bad magic number = ef0a0a0a
+    String sql = "select * from dfs.`todo/mac2.pcap` limit 1";  // Bad magic number = 1c0a0a0a
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    assertEquals(1, sets.rowCount());
+    sets.clear();
+  }
+
+  @Test
   public void testGroupBy() throws Exception {
     String sql = "select src_ip, count(1), sum(packet_length) from dfs.`pcapng/sniff.pcapng` group by src_ip";
     QueryBuilder builder = client.queryBuilder().sql(sql);
@@ -214,4 +230,34 @@ public class TestPcapngRecordReader extends ClusterTest {
     String sql = "select * from dfs.`pcapng/drill.pcapng`";
     client.queryBuilder().sql(sql).rowSet();
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testPcapNGFileWithPcapExt() throws Exception {
+    String sql = "select count(*) from dfs.`pcapng/example.pcap`";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+
+    assertEquals("Counts should match", 1, cnt);
+  }
+
+  @Test
+  public void testInlineSchema() throws Exception {
+    String sql =   "SELECT type, packet_length, `timestamp` FROM table(dfs.`pcapng/sniff.pcapng` " +
+            "(type => 'pcapng', stat => false, sessionizeTCPStreams => true )) where type = 'ARP'";
+    RowSet sets = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+            .addNullable("type", MinorType.VARCHAR)
+            .add("packet_length", MinorType.INT)
+            .add("timestamp", MinorType.TIMESTAMP)
+            .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+            .addRow("ARP", 90, Instant.ofEpochMilli(1518010669927L))
+            .addRow("ARP", 90, Instant.ofEpochMilli(1518010671874L))
+            .build();
+
+    assertEquals(2, sets.rowCount());
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+}
diff --git a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
index d1d966e..9971a88 100644
--- a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.QueryBuilder;
@@ -41,7 +43,7 @@ public class TestPcapngStatRecordReader extends ClusterTest {
   @BeforeClass
   public static void setup() throws Exception {
     ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
-    cluster.defineFormat("dfs", "pcapng", new PcapngFormatConfig(null, true));
+    cluster.defineFormat("dfs", "pcapng", new PcapFormatConfig(ImmutableList.of("pcapng"), true, false));
     dirTestWatcher.copyResourceToRoot(Paths.get("pcapng/"));
   }
 
@@ -136,4 +138,4 @@ public class TestPcapngStatRecordReader extends ClusterTest {
     RowSet expected = new RowSetBuilder(client.allocator(), schema).build();
     new RowSetComparison(expected).verifyAndClearAll(sets);
   }
-}
\ No newline at end of file
+}
diff --git a/contrib/format-pcapng/src/test/resources/config/oldPcapPlugins.json b/contrib/format-pcapng/src/test/resources/config/oldPcapPlugins.json
new file mode 100644
index 0000000..c8c6645
--- /dev/null
+++ b/contrib/format-pcapng/src/test/resources/config/oldPcapPlugins.json
@@ -0,0 +1,12 @@
+{
+  "type": "file",
+  "formats": {
+    "pcapng": {
+      "type": "pcapng",
+      "stat" : true
+    },
+    "pcap" : {
+      "type" : "pcap"
+    }
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/store/pcap/arpWithNullIP.pcap b/contrib/format-pcapng/src/test/resources/pcap/arpWithNullIP.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/arpWithNullIP.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/arpWithNullIP.pcap
diff --git a/exec/java-exec/src/test/resources/store/pcap/data-1.pcap b/contrib/format-pcapng/src/test/resources/pcap/data-1.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/data-1.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/data-1.pcap
diff --git a/contrib/format-pcapng/src/test/resources/pcap/data-2.pcap b/contrib/format-pcapng/src/test/resources/pcap/data-2.pcap
new file mode 100644
index 0000000..a9a4563
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/pcap/data-2.pcap differ
diff --git a/exec/java-exec/src/test/resources/store/pcap/dataFromRemote.txt b/contrib/format-pcapng/src/test/resources/pcap/dataFromRemote.txt
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/dataFromRemote.txt
rename to contrib/format-pcapng/src/test/resources/pcap/dataFromRemote.txt
diff --git a/exec/java-exec/src/test/resources/store/pcap/http.pcap b/contrib/format-pcapng/src/test/resources/pcap/http.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/http.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/http.pcap
diff --git a/exec/java-exec/src/test/resources/store/pcap/synscan.pcap b/contrib/format-pcapng/src/test/resources/pcap/synscan.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/synscan.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/synscan.pcap
diff --git a/exec/java-exec/src/test/resources/store/pcap/tcp-1.pcap b/contrib/format-pcapng/src/test/resources/pcap/tcp-1.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/tcp-1.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/tcp-1.pcap
diff --git a/exec/java-exec/src/test/resources/store/pcap/tcp-2.pcap b/contrib/format-pcapng/src/test/resources/pcap/tcp-2.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/tcp-2.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/tcp-2.pcap
diff --git a/exec/java-exec/src/test/resources/store/pcap/testv1.pcap b/contrib/format-pcapng/src/test/resources/pcap/testv1.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/testv1.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/testv1.pcap
diff --git a/contrib/format-pcapng/src/test/resources/pcapng/example.pcap b/contrib/format-pcapng/src/test/resources/pcapng/example.pcap
new file mode 100644
index 0000000..002cb8d
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/pcapng/example.pcap differ
diff --git a/contrib/format-pcapng/src/test/resources/todo/dhcp.pcapng b/contrib/format-pcapng/src/test/resources/todo/dhcp.pcapng
new file mode 100644
index 0000000..d7d2e33
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/dhcp.pcapng differ
diff --git a/contrib/format-pcapng/src/test/resources/todo/dhcp_big_endian.pcapng b/contrib/format-pcapng/src/test/resources/todo/dhcp_big_endian.pcapng
new file mode 100644
index 0000000..894b361
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/dhcp_big_endian.pcapng differ
diff --git a/contrib/format-pcapng/src/test/resources/todo/dhcp_little_endian.pcapng b/contrib/format-pcapng/src/test/resources/todo/dhcp_little_endian.pcapng
new file mode 100644
index 0000000..3378440
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/dhcp_little_endian.pcapng differ
diff --git a/contrib/format-pcapng/src/test/resources/todo/many_interfaces.pcapng b/contrib/format-pcapng/src/test/resources/todo/many_interfaces.pcapng
new file mode 100644
index 0000000..6a8b397
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/many_interfaces.pcapng differ
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
index 43fa88b..592479e 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
@@ -49,7 +49,8 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
   @Test
   public void testTableProvider() throws StoreException {
     LogicalPlanPersistence lp = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(config);
-    PersistentStore<String> hbaseStore = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build());
+    PersistentStore<String> hbaseStore = provider.getOrCreateStore(
+            PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build());
     hbaseStore.put("", "v0");
     hbaseStore.put("k1", "v1");
     hbaseStore.put("k2", "v2");
@@ -66,7 +67,8 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
 
     assertEquals(7, Lists.newArrayList(hbaseStore.getAll()).size());
 
-    PersistentStore<String> hbaseTestStore = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build());
+    PersistentStore<String> hbaseTestStore = provider.getOrCreateStore(
+            PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build());
     hbaseTestStore.put("", "v0");
     hbaseTestStore.put("k1", "v1");
     hbaseTestStore.put("k2", "v2");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
index 842c798..9fe2f94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
@@ -62,7 +62,6 @@ public class ColumnsScanFramework extends FileScanFramework {
   /**
    * Implementation of the columns array schema negotiator.
    */
-
   public static class ColumnsSchemaNegotiatorImpl extends FileSchemaNegotiatorImpl
           implements ColumnsSchemaNegotiator {
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
index b241528..12e1bd0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.drill.common.exceptions.ChildErrorContext;
 import org.apache.drill.common.exceptions.CustomErrorContext;
@@ -188,11 +189,18 @@ public class FileScanFramework extends ManagedScanFramework {
     }
 
     public abstract ManagedReader<? extends FileSchemaNegotiator> newReader();
+
+    /**
+     * @return FileScanFramework or empty object in case it is not binded yet with {@link #bind(ManagedScanFramework)}
+     */
+    protected Optional<FileScanFramework> fileFramework() {
+      return Optional.ofNullable(fileFramework);
+    }
   }
 
   private ImplicitColumnManager metadataManager;
   private DrillFileSystem dfs;
-  private final List<FileSplit> spilts = new ArrayList<>();
+  private final List<FileSplit> splits = new ArrayList<>();
   private Iterator<FileSplit> splitIter;
   private FileSplit currentSplit;
 
@@ -230,9 +238,9 @@ public class FileScanFramework extends ManagedScanFramework {
       Path path = dfs.makeQualified(work.getPath());
       paths.add(path);
       FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""});
-      spilts.add(split);
+      splits.add(split);
     }
-    splitIter = spilts.iterator();
+    splitIter = splits.iterator();
 
     // Create the metadata manager to handle file metadata columns
     // (so-called implicit columns and partition columns.)
@@ -274,4 +282,8 @@ public class FileScanFramework extends ManagedScanFramework {
         .build(logger);
     }
   }
+
+  public DrillFileSystem fileSystem() {
+    return dfs;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 5c290d3..d701da4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -382,8 +382,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
    */
   protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
       StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable,
-      boolean blockSplittable,
-      boolean compressible, List<String> extensions, String defaultName) {
+      boolean blockSplittable, boolean compressible, List<String> extensions, String defaultName) {
     this.name = name == null ? defaultName : name;
     easyConfig = EasyFormatConfig.builder()
         .matcher(new BasicFormatMatcher(this, fsConf, extensions, compressible))
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
deleted file mode 100644
index 1312151..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcap;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-
-import java.util.List;
-import java.util.Objects;
-
-@JsonTypeName(PcapFormatPlugin.PLUGIN_NAME)
-public class PcapFormatConfig implements FormatPluginConfig {
-  private static final List<String> DEFAULT_EXTNS = ImmutableList.of(PcapFormatPlugin.PLUGIN_NAME);
-
-  private final List<String> extensions;
-  private final boolean sessionizeTCPStreams;
-
-  @JsonCreator
-  public PcapFormatConfig(
-      @JsonProperty("extensions") List<String> extensions,
-      @JsonProperty("sessionizeTCPStreams") Boolean sessionizeTCPStreams) {
-    this.extensions = extensions == null ?
-        DEFAULT_EXTNS : ImmutableList.copyOf(extensions);
-    this.sessionizeTCPStreams = sessionizeTCPStreams == null ? false : sessionizeTCPStreams;
-  }
-
-  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-  public List<String> getExtensions() {
-    return extensions;
-  }
-
-  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-  public boolean getSessionizeTCPStreams() {
-    return sessionizeTCPStreams;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(extensions, sessionizeTCPStreams);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null || getClass() != obj.getClass()) {
-      return false;
-    }
-    PcapFormatConfig other = (PcapFormatConfig) obj;
-    return Objects.equals(extensions, other.extensions) &&
-           Objects.equals(sessionizeTCPStreams, other.sessionizeTCPStreams);
-  }
-
-  @Override
-  public String toString() {
-    return new PlanStringBuilder(this)
-        .field("extensions", extensions)
-        .field("sessionizeTCPStreams", sessionizeTCPStreams)
-        .toString();
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
deleted file mode 100644
index 69dbfc5..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcap;
-
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.drill.exec.store.pcap.PcapBatchReader.PcapReaderConfig;
-
-public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> {
-
-  public static final String PLUGIN_NAME = "pcap";
-
-  private static class PcapReaderFactory extends FileReaderFactory {
-
-    private final PcapReaderConfig readerConfig;
-    private final int maxRecords;
-
-    public PcapReaderFactory(PcapReaderConfig config, int maxRecords) {
-      readerConfig = config;
-      this.maxRecords = maxRecords;
-    }
-
-    @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      return new PcapBatchReader(readerConfig, maxRecords);
-    }
-  }
-
-  public PcapFormatPlugin(String name, DrillbitContext context,
-                           Configuration fsConf, StoragePluginConfig storageConfig,
-                           PcapFormatConfig formatConfig) {
-    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
-  }
-
-  private static EasyFormatConfig easyConfig(Configuration fsConf, PcapFormatConfig pluginConfig) {
-    return EasyFormatConfig.builder()
-        .readable(true)
-        .writable(false)
-        .blockSplittable(false)
-        .compressible(true)
-        .supportsProjectPushdown(true)
-        .extensions(pluginConfig.getExtensions())
-        .fsConf(fsConf)
-        .defaultName(PLUGIN_NAME)
-        .useEnhancedScan(true)
-        .supportsLimitPushdown(true)
-        .build();
-  }
-
-  @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) {
-    return new PcapBatchReader(new PcapReaderConfig(this), scan.getMaxRecords());
-  }
-
-  @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
-    FileScanBuilder builder = new FileScanBuilder();
-    builder.setReaderFactory(new PcapReaderFactory(new PcapReaderConfig(this), scan.getMaxRecords()));
-    initScanBuilder(builder, scan);
-    builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
-    return builder;
-  }
-}
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 4ee29c2..81f63f2 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -38,9 +38,6 @@
           "type" : "json",
           "extensions" : [ "json" ]
         },
-        "pcap" : {
-          "type" : "pcap"
-        },
         "avro" : {
           "type" : "avro",
           "extensions" : [ "avro" ]
@@ -98,9 +95,6 @@
           "type" : "json",
           "extensions" : [ "json" ]
         },
-        "pcap" : {
-          "type" : "pcap"
-        },
        "avro" : {
           "type" : "avro"
         },
@@ -135,9 +129,6 @@
           "type" : "json",
           "extensions" : [ "json" ]
         },
-        "pcap" : {
-          "type" : "pcap"
-        },
         "parquet" : {
           "type" : "parquet"
         },
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
index a55c7c3..1ceb91e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.exec;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.drill.common.config.DrillConfig;
 import org.junit.After;
 import org.junit.Before;
 
@@ -35,4 +39,17 @@ public class TestWithZookeeper extends ExecTest {
   public void tearDown() throws Exception {
     zkHelper.stopZookeeper();
   }
+
+  protected CuratorFramework createCurator() {
+    String connect = zkHelper.getConnectionString();
+    DrillConfig config = zkHelper.getConfig();
+
+    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+            .namespace(zkHelper.getConfig().getString(ExecConstants.ZK_ROOT))
+            .retryPolicy(new RetryNTimes(1, 100))
+            .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
+            .connectString(connect);
+
+    return builder.build();
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
index 6001773..f8dee17 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
@@ -82,16 +82,6 @@ public class FormatPluginSerDeTest extends PlanTestBase {
   }
 
   @Test
-  public void testPcap() throws Exception {
-    String path = "store/pcap/tcp-1.pcap";
-    dirTestWatcher.copyResourceToRoot(Paths.get(path));
-    testPhysicalPlanSubmission(
-        String.format("select * from dfs.`%s`", path),
-        String.format("select * from table(dfs.`%s`(type=>'pcap'))", path)
-    );
-  }
-
-  @Test
   public void testJson() throws Exception {
     testPhysicalPlanSubmission(
         "select * from cp.`donuts.json`",
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
index efc8a7b..cab9877 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.store.sys;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.DrillFileUtils;
@@ -144,17 +142,4 @@ public class TestPStoreProviders extends TestWithZookeeper {
       }
     }
   }
-
-  private CuratorFramework createCurator() {
-    String connect = zkHelper.getConnectionString();
-    DrillConfig config = zkHelper.getConfig();
-
-    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
-      .namespace(zkHelper.getConfig().getString(ExecConstants.ZK_ROOT))
-      .retryPolicy(new RetryNTimes(1, 100))
-      .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
-      .connectString(connect);
-
-    return builder.build();
-  }
 }
diff --git a/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json b/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json
index fd86af2..e8709bc 100644
--- a/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json
+++ b/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json
@@ -33,9 +33,6 @@
           "type" : "json",
           "extensions" : [ "json" ]
         },
-        "pcap" : {
-          "type" : "pcap"
-        },
         "avro" : {
           "type" : "avro",
           "extensions" : [ "avro" ]
@@ -147,9 +144,6 @@
           "type" : "json",
           "extensions" : [ "json" ]
         },
-        "pcap" : {
-          "type" : "pcap"
-        },
         "avro" : {
           "type" : "avro",
           "extensions" : [ "avro" ]
diff --git a/exec/java-exec/src/test/resources/store/pcap/data-2.pcap b/exec/java-exec/src/test/resources/store/pcap/data-2.pcap
deleted file mode 100644
index 1f23ee8..0000000
Binary files a/exec/java-exec/src/test/resources/store/pcap/data-2.pcap and /dev/null differ