You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2021/04/26 08:19:54 UTC
[drill] branch master updated: DRILL-7828: Refactor Pcap and Pcapng
format plugin (#2192)
This is an automated email from the ASF dual-hosted git repository.
vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 360b080 DRILL-7828: Refactor Pcap and Pcapng format plugin (#2192)
360b080 is described below
commit 360b080ff04703aadfb0085ca6464fb2e85dd6d5
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Mon Apr 26 11:19:43 2021 +0300
DRILL-7828: Refactor Pcap and Pcapng format plugin (#2192)
---
.../drill/exec/store/pcap/PcapBatchReader.java | 78 +----------
.../drill/exec/store/pcap/PcapFormatUtils.java | 2 +-
.../drill/exec/store/pcap/decoder/Murmur128.java | 0
.../drill/exec/store/pcap/decoder/Packet.java | 0
.../exec/store/pcap/decoder/PacketConstants.java | 0
.../exec/store/pcap/decoder/PacketDecoder.java | 27 +++-
.../exec/store/pcap/decoder/TcpHandshake.java | 0
.../drill/exec/store/pcap/decoder/TcpSession.java | 0
.../drill/exec/store/pcap/dto/ColumnDto.java | 0
.../apache/drill/exec/store/pcap/package-info.java | 0
.../store/pcap/plugin/BasePcapFormatPlugin.java | 153 +++++++++++++++++++++
.../plugin/PcapFormatConfig.java} | 35 +++--
.../exec/store/pcap/plugin/PcapFormatPlugin.java | 22 ++-
.../plugin}/PcapngFormatConfig.java | 57 ++------
.../exec/store/pcap/plugin/PcapngFormatPlugin.java | 25 ++--
.../drill/exec/store/pcap/schema/PcapTypes.java | 0
.../drill/exec/store/pcap/schema/Schema.java | 0
.../drill/exec/store/pcapng/PcapngBatchReader.java | 5 +-
.../exec/store/pcapng/PcapngFormatPlugin.java | 90 ------------
.../main/resources/bootstrap-format-plugins.json | 15 +-
.../store/pcap/TestPcapWithPersistentStore.java | 84 +++++++++++
.../apache/drill/exec/store/pcap/ConcatPcap.java | 0
.../drill/exec/store/pcap/TestPcapDecoder.java | 4 +-
.../drill/exec/store/pcap/TestPcapEVFReader.java | 11 +-
.../exec/store/pcap/TestPcapRecordReader.java | 41 ++++--
.../drill/exec/store/pcap/TestSessionizePCAP.java | 15 +-
.../exec/store/pcapng/TestPcapngRecordReader.java | 48 ++++++-
.../store/pcapng/TestPcapngStatRecordReader.java | 6 +-
.../src/test/resources/config/oldPcapPlugins.json | 12 ++
.../src/test/resources}/pcap/arpWithNullIP.pcap | Bin
.../src/test/resources}/pcap/data-1.pcap | Bin
.../src/test/resources/pcap/data-2.pcap | Bin 0 -> 1475104 bytes
.../src/test/resources}/pcap/dataFromRemote.txt | 0
.../src/test/resources}/pcap/http.pcap | Bin
.../src/test/resources}/pcap/synscan.pcap | Bin
.../src/test/resources}/pcap/tcp-1.pcap | Bin
.../src/test/resources}/pcap/tcp-2.pcap | Bin
.../src/test/resources}/pcap/testv1.pcap | Bin
.../src/test/resources/pcapng/example.pcap | Bin 0 -> 512 bytes
.../src/test/resources/todo/dhcp.pcapng | Bin 0 -> 1733 bytes
.../src/test/resources/todo/dhcp_big_endian.pcapng | Bin 0 -> 1757 bytes
.../test/resources/todo/dhcp_little_endian.pcapng | Bin 0 -> 1757 bytes
.../src/test/resources/todo/many_interfaces.pcapng | Bin 0 -> 30743 bytes
.../apache/drill/hbase/TestHBaseTableProvider.java | 6 +-
.../impl/scan/columns/ColumnsScanFramework.java | 1 -
.../physical/impl/scan/file/FileScanFramework.java | 18 ++-
.../exec/store/dfs/easy/EasyFormatPlugin.java | 3 +-
.../drill/exec/store/pcap/PcapFormatConfig.java | 83 -----------
.../drill/exec/store/pcap/PcapFormatPlugin.java | 88 ------------
.../main/resources/bootstrap-storage-plugins.json | 9 --
.../org/apache/drill/exec/TestWithZookeeper.java | 17 +++
.../drill/exec/store/FormatPluginSerDeTest.java | 10 --
.../drill/exec/store/sys/TestPStoreProviders.java | 15 --
.../resources/plugins/mock-plugin-upgrade.json | 6 -
.../src/test/resources/store/pcap/data-2.pcap | Bin 1167896 -> 0 bytes
55 files changed, 481 insertions(+), 505 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
similarity index 95%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
index 83e2115..fd7fef9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.store.pcap.decoder.Packet;
import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
import org.apache.drill.exec.store.pcap.decoder.TcpSession;
import org.apache.drill.exec.store.pcap.schema.Schema;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
@@ -48,126 +49,61 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
private static final Logger logger = LoggerFactory.getLogger(PcapBatchReader.class);
private FileSplit split;
-
private PacketDecoder decoder;
-
private InputStream fsStream;
-
private RowSetLoader rowWriter;
-
private int validBytes;
-
private byte[] buffer;
-
private int offset;
-
private ScalarWriter typeWriter;
-
private ScalarWriter timestampWriter;
-
private ScalarWriter timestampMicroWriter;
-
private ScalarWriter networkWriter;
-
private ScalarWriter srcMacAddressWriter;
-
private ScalarWriter dstMacAddressWriter;
-
private ScalarWriter dstIPWriter;
-
private ScalarWriter srcIPWriter;
-
private ScalarWriter srcPortWriter;
-
private ScalarWriter dstPortWriter;
-
private ScalarWriter packetLengthWriter;
-
private ScalarWriter tcpSessionWriter;
-
private ScalarWriter tcpSequenceWriter;
-
private ScalarWriter tcpAckNumberWriter;
-
private ScalarWriter tcpFlagsWriter;
-
private ScalarWriter tcpParsedFlagsWriter;
-
private ScalarWriter tcpNsWriter;
-
private ScalarWriter tcpCwrWriter;
-
private ScalarWriter tcpEceWriter;
-
private ScalarWriter tcpFlagsEceEcnCapableWriter;
-
private ScalarWriter tcpFlagsCongestionWriter;
-
private ScalarWriter tcpUrgWriter;
-
private ScalarWriter tcpAckWriter;
-
private ScalarWriter tcpPshWriter;
-
private ScalarWriter tcpRstWriter;
-
private ScalarWriter tcpSynWriter;
-
private ScalarWriter tcpFinWriter;
-
private ScalarWriter dataWriter;
-
private ScalarWriter isCorruptWriter;
-
- private final PcapReaderConfig readerConfig;
-
-
+ private final PcapFormatConfig readerConfig;
// Writers for TCP Sessions
private ScalarWriter sessionStartTimeWriter;
-
private ScalarWriter sessionEndTimeWriter;
-
private ScalarWriter sessionDurationWriter;
-
private ScalarWriter connectionTimeWriter;
-
private ScalarWriter packetCountWriter;
-
private ScalarWriter originPacketCounterWriter;
-
private ScalarWriter remotePacketCounterWriter;
-
private ScalarWriter originDataVolumeWriter;
-
private ScalarWriter remoteDataVolumeWriter;
-
private ScalarWriter hostDataWriter;
-
private ScalarWriter remoteDataWriter;
-
private final int maxRecords;
-
private Map<Long, TcpSession> sessionQueue;
- public static class PcapReaderConfig {
-
- protected final PcapFormatPlugin plugin;
-
- public boolean sessionizeTCPStreams;
-
- private final PcapFormatConfig config;
-
- public PcapReaderConfig(PcapFormatPlugin plugin) {
- this.plugin = plugin;
- this.config = plugin.getConfig();
- this.sessionizeTCPStreams = config.getSessionizeTCPStreams();
- }
- }
-
- public PcapBatchReader(PcapReaderConfig readerConfig, int maxRecords) {
+ public PcapBatchReader(PcapFormatConfig readerConfig, int maxRecords) {
this.readerConfig = readerConfig;
- if (readerConfig.sessionizeTCPStreams) {
+ if (readerConfig.getSessionizeTCPStreams()) {
sessionQueue = new HashMap<>();
}
this.maxRecords = maxRecords;
@@ -178,7 +114,7 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
split = negotiator.split();
openFile(negotiator);
SchemaBuilder builder = new SchemaBuilder();
- Schema pcapSchema = new Schema(readerConfig.sessionizeTCPStreams);
+ Schema pcapSchema = new Schema(readerConfig.getSessionizeTCPStreams());
TupleMetadata schema = pcapSchema.buildSchema(builder);
negotiator.tableSchema(schema, false);
ResultSetLoader loader = negotiator.build();
@@ -238,7 +174,7 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
private void populateColumnWriters(RowSetLoader rowWriter) {
- if (readerConfig.sessionizeTCPStreams) {
+ if (readerConfig.getSessionizeTCPStreams()) {
srcMacAddressWriter = rowWriter.scalar("src_mac_address");
dstMacAddressWriter = rowWriter.scalar("dst_mac_address");
dstIPWriter = rowWriter.scalar("dst_ip");
@@ -323,7 +259,7 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
// If we are resessionizing the TCP Stream, add the packet to the stream
- if (readerConfig.sessionizeTCPStreams) {
+ if (readerConfig.getSessionizeTCPStreams()) {
// If the session has not been seen before, add it to the queue
long sessionID = packet.getSessionHash();
if (!sessionQueue.containsKey(sessionID)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
similarity index 97%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
index eb9ea05..6c23159 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatUtils.java
@@ -40,7 +40,7 @@ public class PcapFormatUtils {
/**
*
- * @param byteOrder true for forward file order, false fore revers file order
+ * @param byteOrder true for forward file order, false for reverse file order
* @param buf byte buffer
* @param offset buffer offset
* @return short value as int of specific bytes from buffer
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Murmur128.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
similarity index 87%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
index 8e6d867..977e465 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
@@ -59,12 +59,15 @@ public class PacketDecoder {
private static final int GLOBAL_HEADER_SIZE = 24;
private static final int PCAP_MAGIC_LITTLE_ENDIAN = 0xD4C3B2A1;
private static final int PCAP_MAGIC_NUMBER = 0xA1B2C3D4;
+ private static final int PCAPNG_MAGIC_LITTLE_ENDIAN = 0x4D3C2B1A;
+ private static final int PCAPNG_MAGIC_NUMBER = 0x0A0D0D0A;
private static final Logger logger = LoggerFactory.getLogger(PacketDecoder.class);
private final int maxLength;
private final int network;
private boolean bigEndian;
+ private FileFormat fileFormat;
private InputStream input;
@@ -78,16 +81,28 @@ public class PacketDecoder {
switch (getInt(globalHeader, 0)) {
case PCAP_MAGIC_NUMBER:
bigEndian = true;
+ fileFormat = FileFormat.PCAP;
break;
case PCAP_MAGIC_LITTLE_ENDIAN:
bigEndian = false;
+ fileFormat = FileFormat.PCAP;
+ break;
+ case PCAPNG_MAGIC_NUMBER:
+ bigEndian = true;
+ fileFormat = FileFormat.PCAPNG;
+ break;
+ case PCAPNG_MAGIC_LITTLE_ENDIAN:
+ bigEndian = false;
+ fileFormat = FileFormat.PCAPNG;
break;
default:
//noinspection ConstantConditions
Preconditions.checkState(false,
String.format("Bad magic number = %08x", getIntFileOrder(bigEndian, globalHeader, 0)));
}
- Preconditions.checkState(getShortFileOrder(bigEndian, globalHeader, 4) == 2, "Wanted major version == 2");
+ if(fileFormat == FileFormat.PCAP) {
+ Preconditions.checkState(getShortFileOrder(bigEndian, globalHeader, 4) == 2, "Wanted major version == 2");
+ } // todo: pcapng major version == 1 precondition
maxLength = getIntFileOrder(bigEndian, globalHeader, 16);
network = getIntFileOrder(bigEndian, globalHeader, 20);
}
@@ -116,6 +131,10 @@ public class PacketDecoder {
return bigEndian;
}
+ public FileFormat getFileFormat() {
+ return fileFormat;
+ }
+
public Packet nextPacket() throws IOException {
Packet r = new Packet();
if (r.readPcap(input, bigEndian, maxLength)) {
@@ -124,4 +143,10 @@ public class PacketDecoder {
return null;
}
}
+
+ public enum FileFormat {
+ PCAP,
+ PCAPNG,
+ UNKNOWN
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/dto/ColumnDto.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/package-info.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/package-info.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/package-info.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/package-info.java
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
new file mode 100644
index 0000000..4052245
--- /dev/null
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcap.plugin;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.store.pcap.PcapBatchReader;
+import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
+import org.apache.drill.exec.store.pcapng.PcapngBatchReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public abstract class BasePcapFormatPlugin<T extends PcapFormatConfig> extends EasyFormatPlugin<T> {
+
+ static final Logger logger = LoggerFactory.getLogger(ManagedScanFramework.class);
+ private static PacketDecoder.FileFormat fileFormat = PacketDecoder.FileFormat.UNKNOWN;
+
+ public BasePcapFormatPlugin(String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ StoragePluginConfig storageConfig,
+ T formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+ }
+
+ private static EasyFormatConfig easyConfig(Configuration fsConf, PcapFormatConfig pluginConfig) {
+ return EasyFormatConfig.builder()
+ .readable(true)
+ .writable(false)
+ .blockSplittable(false)
+ .compressible(true)
+ .extensions(pluginConfig.getExtensions())
+ .fsConf(fsConf)
+ .useEnhancedScan(true)
+ .supportsLimitPushdown(true)
+ .supportsProjectPushdown(true)
+ .defaultName(PcapFormatConfig.NAME)
+ .build();
+ }
+
+ private static class PcapReaderFactory extends FileReaderFactory {
+
+ private final PcapFormatConfig config;
+ private final EasySubScan scan;
+
+ public PcapReaderFactory(PcapFormatConfig config, EasySubScan scan) {
+ this.config = config;
+ this.scan = scan;
+ }
+
+ /**
+ * Reader creator. If file format can't be detected try to use default PCAP format plugin
+ *
+ * @return PCAP or PCAPNG batch reader
+ */
+ @Override
+ public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+ if (fileFramework().isPresent()) { // todo: can be simplified with java9 ifPresentOrElse
+ Path path = scan.getWorkUnits().stream()
+ .findFirst()
+ .orElseThrow(() -> UserException.
+ dataReadError()
+ .addContext("There are no files for scanning")
+ .build(logger))
+ .getPath();
+ fileFormat = getFileFormat(fileFramework().get().fileSystem(), path);
+ if (config.getExtensions().stream()
+ .noneMatch(f -> f.equals(fileFormat.name().toLowerCase()))) {
+ logger.error("File format {} is not within plugin extensions: {}. Trying to use default PCAP format plugin to " +
+ "read the file", fileFormat, config.getExtensions());
+ }
+ } else {
+ logger.error("It is not possible to detect file format, because the File Framework is not initialized. " +
+ "Trying to use default PCAP format plugin to read the file");
+ }
+ return createReader(scan, config);
+ }
+ }
+
+ @Override
+ public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) {
+ return createReader(scan, formatConfig);
+ }
+
+ private static ManagedReader<? extends FileSchemaNegotiator> createReader(EasySubScan scan, PcapFormatConfig config) {
+ switch(fileFormat) {
+ case PCAPNG: return new PcapngBatchReader(config, scan);
+ case PCAP:
+ case UNKNOWN:
+ default: return new PcapBatchReader(config, scan.getMaxRecords());
+ }
+ }
+
+ @Override
+ protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+ FileScanBuilder builder = new FileScanBuilder();
+ builder.setReaderFactory(new PcapReaderFactory(formatConfig, scan));
+
+ initScanBuilder(builder, scan);
+ builder.nullType(Types.optional(MinorType.VARCHAR));
+ return builder;
+ }
+
+ /**
+ * Helper method to detect PCAP or PCAPNG file format based on file Magic Number
+ *
+ * @param dfs for obtaining InputStream
+ * @return PCAP/PCAPNG file format
+ */
+ private static PacketDecoder.FileFormat getFileFormat(DrillFileSystem dfs, Path path) {
+ try (InputStream inputStream = dfs.openPossiblyCompressedStream(path)) {
+ PacketDecoder decoder = new PacketDecoder(inputStream);
+ return decoder.getFileFormat();
+ } catch (IOException io) {
+ throw UserException
+ .dataReadError(io)
+ .addContext("File name:", path.toString())
+ .build(logger);
+ }
+ }
+}
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatConfig.java
similarity index 59%
copy from contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
copy to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatConfig.java
index 7210f93..b5f8d53 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatConfig.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.pcapng;
+package org.apache.drill.exec.store.pcap.plugin;
import java.util.List;
import java.util.Objects;
@@ -29,18 +29,23 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-@JsonTypeName(PcapngFormatConfig.NAME)
+@JsonTypeName(PcapFormatConfig.NAME)
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
-public class PcapngFormatConfig implements FormatPluginConfig {
+public class PcapFormatConfig implements FormatPluginConfig {
+ private static final List<String> DEFAULT_EXTNS = ImmutableList.of("pcap");
- public static final String NAME = "pcapng";
+ public static final String NAME = "pcap";
private final List<String> extensions;
private final boolean stat;
+ private final boolean sessionizeTCPStreams;
@JsonCreator
- public PcapngFormatConfig(@JsonProperty("extensions") List<String> extensions, @JsonProperty("stat") boolean stat) {
- this.extensions = extensions == null ? ImmutableList.of(PcapngFormatConfig.NAME) : ImmutableList.copyOf(extensions);
+ public PcapFormatConfig(@JsonProperty("extensions") List<String> extensions,
+ @JsonProperty("stat") boolean stat,
+ @JsonProperty("sessionizeTCPStreams") Boolean sessionizeTCPStreams) {
+ this.extensions = extensions == null ? DEFAULT_EXTNS : ImmutableList.copyOf(extensions);
this.stat = stat;
+ this.sessionizeTCPStreams = sessionizeTCPStreams != null && sessionizeTCPStreams;
}
@JsonProperty("extensions")
@@ -53,6 +58,11 @@ public class PcapngFormatConfig implements FormatPluginConfig {
return this.stat;
}
+ @JsonProperty("sessionizeTCPStreams")
+ public boolean getSessionizeTCPStreams() {
+ return sessionizeTCPStreams;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -61,17 +71,22 @@ public class PcapngFormatConfig implements FormatPluginConfig {
if (o == null || getClass() != o.getClass()) {
return false;
}
- PcapngFormatConfig that = (PcapngFormatConfig) o;
- return Objects.equals(extensions, that.extensions) && Objects.equals(stat, that.getStat());
+ PcapFormatConfig that = (PcapFormatConfig) o;
+ return Objects.equals(extensions, that.extensions) && Objects.equals(stat, that.getStat()) &&
+ Objects.equals(sessionizeTCPStreams, that.sessionizeTCPStreams);
}
@Override
public int hashCode() {
- return Objects.hash(extensions, stat);
+ return Objects.hash(extensions, stat, sessionizeTCPStreams);
}
@Override
public String toString() {
- return new PlanStringBuilder(this).field("extensions", extensions).field("stat", stat).toString();
+ return new PlanStringBuilder(this)
+ .field("extensions", extensions)
+ .field("stat", stat)
+ .field("sessionizeTCPStreams", sessionizeTCPStreams)
+ .toString();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatPlugin.java
similarity index 55%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatPlugin.java
index a078a3e..34ba5c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapFormatPlugin.java
@@ -15,20 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.sys.local;
+package org.apache.drill.exec.store.pcap.plugin;
-import org.apache.drill.exec.exception.StoreException;
-import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
-import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.hadoop.conf.Configuration;
-/**
- * Kept for possible references to old class name in configuration.
- *
- * @deprecated will be removed in 1.7
- * use {@link org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider} instead.
- */
-public class LocalPStoreProvider extends LocalPersistentStoreProvider {
- public LocalPStoreProvider(PersistentStoreRegistry registry) throws StoreException {
- super(registry);
+public class PcapFormatPlugin extends BasePcapFormatPlugin<PcapFormatConfig> {
+
+ public PcapFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig, PcapFormatConfig formatConfig) {
+ super(name, context, fsConf, storageConfig, formatConfig);
}
}
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatConfig.java
similarity index 51%
rename from contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatConfig.java
index 7210f93..082840a 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatConfig.java
@@ -15,63 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.pcapng;
-
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+package org.apache.drill.exec.store.pcap.plugin;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.util.List;
@JsonTypeName(PcapngFormatConfig.NAME)
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
-public class PcapngFormatConfig implements FormatPluginConfig {
-
+@Deprecated // for backward compatibility
+public class PcapngFormatConfig extends PcapFormatConfig {
+ private static final List<String> DEFAULT_EXTNS = ImmutableList.of("pcapng");
public static final String NAME = "pcapng";
- private final List<String> extensions;
- private final boolean stat;
@JsonCreator
- public PcapngFormatConfig(@JsonProperty("extensions") List<String> extensions, @JsonProperty("stat") boolean stat) {
- this.extensions = extensions == null ? ImmutableList.of(PcapngFormatConfig.NAME) : ImmutableList.copyOf(extensions);
- this.stat = stat;
- }
-
- @JsonProperty("extensions")
- public List<String> getExtensions() {
- return extensions;
- }
-
- @JsonProperty("stat")
- public boolean getStat() {
- return this.stat;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PcapngFormatConfig that = (PcapngFormatConfig) o;
- return Objects.equals(extensions, that.extensions) && Objects.equals(stat, that.getStat());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(extensions, stat);
- }
-
- @Override
- public String toString() {
- return new PlanStringBuilder(this).field("extensions", extensions).field("stat", stat).toString();
+ public PcapngFormatConfig(@JsonProperty("extensions") List<String> extensions,
+ @JsonProperty("stat") boolean stat) {
+ super(extensions == null ? DEFAULT_EXTNS : ImmutableList.copyOf(extensions), stat, null);
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatPlugin.java
similarity index 58%
copy from exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
copy to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatPlugin.java
index a55c7c3..a57e429 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/PcapngFormatPlugin.java
@@ -15,24 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec;
+package org.apache.drill.exec.store.pcap.plugin;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.hadoop.conf.Configuration;
-public class TestWithZookeeper extends ExecTest {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestWithZookeeper.class);
+@Deprecated // for backward compatibility
+public class PcapngFormatPlugin extends BasePcapFormatPlugin<PcapngFormatConfig> {
- protected ZookeeperHelper zkHelper;
-
- @Before
- public void setUp() throws Exception {
- zkHelper = new ZookeeperHelper();
- zkHelper.startZookeeper(1);
- }
-
- @After
- public void tearDown() throws Exception {
- zkHelper.stopZookeeper();
+ public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storageConfig, PcapngFormatConfig formatConfig) {
+ super(name, context, fsConf, storageConfig, formatConfig);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
rename to contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
index eeabebf..e8367c5 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.hadoop.fs.Path;
@@ -54,7 +55,7 @@ public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> {
private static final Logger logger = LoggerFactory.getLogger(PcapngBatchReader.class);
- private final PcapngFormatConfig config;
+ private final PcapFormatConfig config;
private final EasySubScan scan;
private final int maxRecords;
private CustomErrorContext errorContext;
@@ -66,7 +67,7 @@ public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> {
private InputStream in;
private Path path;
- public PcapngBatchReader(final PcapngFormatConfig config, final EasySubScan scan) {
+ public PcapngBatchReader(final PcapFormatConfig config, final EasySubScan scan) {
this.config = config;
this.scan = scan;
this.maxRecords = scan.getMaxRecords();
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
deleted file mode 100644
index 0cccd6b..0000000
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.hadoop.conf.Configuration;
-
-public class PcapngFormatPlugin extends EasyFormatPlugin<PcapngFormatConfig> {
-
- public PcapngFormatPlugin(String name,
- DrillbitContext context,
- Configuration fsConf,
- StoragePluginConfig storageConfig,
- PcapngFormatConfig formatConfig) {
- super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
- }
-
- private static EasyFormatConfig easyConfig(Configuration fsConf, PcapngFormatConfig pluginConfig) {
- return EasyFormatConfig.builder()
- .readable(true)
- .writable(false)
- .blockSplittable(false)
- .compressible(true)
- .extensions(pluginConfig.getExtensions())
- .fsConf(fsConf)
- .useEnhancedScan(true)
- .supportsLimitPushdown(true)
- .supportsProjectPushdown(true)
- .defaultName(PcapngFormatConfig.NAME)
- .build();
- }
-
- private static class PcapngReaderFactory extends FileReaderFactory {
-
- private final PcapngFormatConfig config;
- private final EasySubScan scan;
-
- public PcapngReaderFactory(PcapngFormatConfig config, EasySubScan scan) {
- this.config = config;
- this.scan = scan;
- }
-
- @Override
- public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new PcapngBatchReader(config, scan);
- }
- }
-
- @Override
- public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options)
- throws ExecutionSetupException {
- return new PcapngBatchReader(formatConfig, scan);
- }
-
- @Override
- protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException {
- FileScanBuilder builder = new FileScanBuilder();
- builder.setReaderFactory(new PcapngReaderFactory(formatConfig, scan));
-
- initScanBuilder(builder, scan);
- builder.nullType(Types.optional(MinorType.VARCHAR));
- return builder;
- }
-}
diff --git a/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json b/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json
index 60b04d1..550e6e5 100644
--- a/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json
+++ b/contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json
@@ -3,8 +3,9 @@
"dfs": {
"type": "file",
"formats": {
- "pcapng": {
- "type": "pcapng",
+ "pcap": {
+ "type": "pcap",
+ "extensions": ["pcap", "pcapng"],
"stat" : false
}
}
@@ -12,8 +13,9 @@
"cp": {
"type": "file",
"formats": {
- "pcapng": {
- "type": "pcapng",
+ "pcap": {
+ "type": "pcap",
+ "extensions": ["pcap", "pcapng"],
"stat" : false
}
}
@@ -21,8 +23,9 @@
"s3": {
"type": "file",
"formats": {
- "pcapng": {
- "type": "pcapng",
+ "pcap": {
+ "type": "pcap",
+ "extensions": ["pcap", "pcapng"],
"stat" : false
}
}
diff --git a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/persistent/store/pcap/TestPcapWithPersistentStore.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/persistent/store/pcap/TestPcapWithPersistentStore.java
new file mode 100644
index 0000000..1328455
--- /dev/null
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/persistent/store/pcap/TestPcapWithPersistentStore.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.persistent.store.pcap;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.TestWithZookeeper;
+import org.apache.drill.exec.coord.zk.PathUtils;
+import org.apache.drill.exec.coord.zk.ZookeeperClient;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
+import org.apache.drill.exec.store.pcap.plugin.PcapngFormatConfig;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore;
+import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
+import org.apache.zookeeper.CreateMode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestPcapWithPersistentStore extends TestWithZookeeper {
+ /**
+ * DRILL-7828
+ * Note: If this test breaks you are probably breaking backward and forward compatibility. Verify with the community
+ * that breaking compatibility is acceptable and planned for.
+ */
+ @Test
+ public void pcapPluginBackwardCompatabilityTest() throws Exception {
+ final String oldPlugin = "oldFormatPlugin";
+
+ try (CuratorFramework curator = createCurator()) {
+ curator.start();
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerSubtypes(PcapFormatConfig.class, PcapngFormatConfig.class);
+ PersistentStoreConfig<FileSystemConfig> storeConfig =
+ PersistentStoreConfig.newJacksonBuilder(objectMapper, FileSystemConfig.class).name("type").build();
+
+
+ try (ZookeeperClient zkClient = new ZookeeperClient(curator,
+ PathUtils.join("/", storeConfig.getName()), CreateMode.PERSISTENT)) {
+ zkClient.start();
+ String oldFormatPlugin = DrillFileUtils.getResourceAsString("/config/oldPcapPlugins.json");
+ zkClient.put(oldPlugin, oldFormatPlugin.getBytes(), null);
+ }
+
+ try (ZookeeperPersistentStoreProvider provider =
+ new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator)) {
+ PersistentStore<FileSystemConfig> store = provider.getOrCreateStore(storeConfig);
+ assertTrue(store instanceof ZookeeperPersistentStore);
+
+ FileSystemConfig oldPluginConfig = ((ZookeeperPersistentStore<FileSystemConfig>)store).get(oldPlugin, null);
+ Map<String, FormatPluginConfig> formats = oldPluginConfig.getFormats();
+ Assert.assertEquals(formats.keySet(), ImmutableSet.of("pcap", "pcapng"));
+ PcapFormatConfig pcap = (PcapFormatConfig) formats.get("pcap");
+ PcapngFormatConfig pcapng = (PcapngFormatConfig) formats.get("pcapng");
+ Assert.assertEquals(pcap.getExtensions(), ImmutableList.of("pcap"));
+ assertTrue(pcapng.getStat());
+ }
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java
similarity index 100%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java
rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/ConcatPcap.java
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
similarity index 98%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
index 1a29902..d171c31 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
@@ -57,7 +57,7 @@ public class TestPcapDecoder extends BaseTestQuery {
@Test
public void testBasics() throws IOException {
- InputStream in = Resources.getResource("store/pcap/tcp-2.pcap").openStream();
+ InputStream in = Resources.getResource("pcap/tcp-2.pcap").openStream();
PacketDecoder pd = new PacketDecoder(in);
Packet p = pd.packet();
int offset = 0;
@@ -226,7 +226,7 @@ public class TestPcapDecoder extends BaseTestQuery {
// might be faster to keep this open and rewind each time, but
// that is hard to do with a resource, especially if it comes
// from the class path instead of files.
- try (InputStream in = Resources.getResource("store/pcap/tcp-2.pcap").openStream()) {
+ try (InputStream in = Resources.getResource("pcap/tcp-2.pcap").openStream()) {
ConcatPcap.copy(first, in, out);
}
first = false;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
similarity index 92%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
index 5796fa8..9a27276 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
@@ -19,6 +19,7 @@
package org.apache.drill.exec.store.pcap;
import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.junit.BeforeClass;
@@ -34,12 +35,12 @@ public class TestPcapEVFReader extends ClusterTest {
@BeforeClass
public static void setup() throws Exception {
ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
- cluster.defineFormat("cp", "sample", new PcapFormatConfig(null, null));
+ cluster.defineFormat("cp", "sample", new PcapFormatConfig(null, true, false));
}
@Test
public void testStarQuery() throws Exception {
- String sql = "SELECT * FROM cp.`store/pcap/synscan.pcap` LIMIT 1";
+ String sql = "SELECT * FROM cp.`pcap/synscan.pcap` LIMIT 1";
testBuilder()
.sqlQuery(sql)
@@ -58,7 +59,7 @@ public class TestPcapEVFReader extends ClusterTest {
"packet_length, tcp_session, " +
"tcp_sequence, tcp_ack, tcp_flags," +
" tcp_parsed_flags, tcp_flags_ns, tcp_flags_cwr, tcp_flags_ece, tcp_flags_ece_ecn_capable, tcp_flags_ece_congestion_experienced, tcp_flags_urg, tcp_flags_ack, tcp_flags_psh, tcp_flags_rst, tcp_flags_syn," +
- " tcp_flags_fin, data, is_corrupt FROM cp.`store/pcap/synscan.pcap` LIMIT 1";
+ " tcp_flags_fin, data, is_corrupt FROM cp.`pcap/synscan.pcap` LIMIT 1";
testBuilder()
.sqlQuery(sql)
@@ -72,7 +73,7 @@ public class TestPcapEVFReader extends ClusterTest {
@Test
public void testAggregateQuery() throws Exception {
- String sql = "SELECT is_corrupt, COUNT(*) as packet_count FROM cp.`store/pcap/testv1.pcap` GROUP BY is_corrupt ORDER BY packet_count DESC";
+ String sql = "SELECT is_corrupt, COUNT(*) as packet_count FROM cp.`pcap/testv1.pcap` GROUP BY is_corrupt ORDER BY packet_count DESC";
testBuilder()
.sqlQuery(sql)
@@ -85,7 +86,7 @@ public class TestPcapEVFReader extends ClusterTest {
@Test
public void testArpPcapFile() throws Exception {
- String sql = "SELECT src_ip, dst_ip FROM cp.`store/pcap/arpWithNullIP.pcap` WHERE src_port=1";
+ String sql = "SELECT src_ip, dst_ip FROM cp.`pcap/arpWithNullIP.pcap` WHERE src_port=1";
testBuilder()
.sqlQuery(sql)
.ordered()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
similarity index 69%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
index e1a71b6..43fc3ff3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.pcap;
+import org.apache.drill.PlanTestBase;
import org.apache.drill.exec.store.pcap.decoder.Packet;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
@@ -31,42 +32,42 @@ import static org.junit.Assert.assertEquals;
public class TestPcapRecordReader extends BaseTestQuery {
@BeforeClass
public static void setupTestFiles() {
- dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcap"));
+ dirTestWatcher.copyResourceToRoot(Paths.get("pcap"));
}
@Test
public void testStarQuery() throws Exception {
- runSQLVerifyCount("select * from dfs.`store/pcap/tcp-1.pcap`", 16);
- runSQLVerifyCount("select distinct DST_IP from dfs.`store/pcap/tcp-1.pcap`", 1);
- runSQLVerifyCount("select distinct DsT_IP from dfs.`store/pcap/tcp-1.pcap`", 1);
- runSQLVerifyCount("select distinct dst_ip from dfs.`store/pcap/tcp-1.pcap`", 1);
+ runSQLVerifyCount("select * from dfs.`pcap/tcp-1.pcap`", 16);
+ runSQLVerifyCount("select distinct DST_IP from dfs.`pcap/tcp-1.pcap`", 1);
+ runSQLVerifyCount("select distinct DsT_IP from dfs.`pcap/tcp-1.pcap`", 1);
+ runSQLVerifyCount("select distinct dst_ip from dfs.`pcap/tcp-1.pcap`", 1);
}
@Test
public void testCorruptPCAPQuery() throws Exception {
- runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap`", 7000);
+ runSQLVerifyCount("select * from dfs.`pcap/testv1.pcap`", 7000);
}
@Test
public void testTrueCorruptPCAPQuery() throws Exception {
- runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap` WHERE is_corrupt=true", 16);
+ runSQLVerifyCount("select * from dfs.`pcap/testv1.pcap` WHERE is_corrupt=true", 16);
}
@Test
public void testNotCorruptPCAPQuery() throws Exception {
- runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap` WHERE is_corrupt=false", 6984);
+ runSQLVerifyCount("select * from dfs.`pcap/testv1.pcap` WHERE is_corrupt=false", 6984);
}
@Test
public void testCountQuery() throws Exception {
- runSQLVerifyCount("select count(*) from dfs.`store/pcap/tcp-1.pcap`", 1);
- runSQLVerifyCount("select count(*) from dfs.`store/pcap/tcp-2.pcap`", 1);
+ runSQLVerifyCount("select count(*) from dfs.`pcap/tcp-1.pcap`", 1);
+ runSQLVerifyCount("select count(*) from dfs.`pcap/tcp-2.pcap`", 1);
}
@Test
public void testDistinctQuery() throws Exception {
// omit data field from distinct count for now
- runSQLVerifyCount("select distinct type, network, `timestamp`, src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, packet_length from dfs.`store/pcap/tcp-1.pcap`", 1);
+ runSQLVerifyCount("select distinct type, network, `timestamp`, src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, packet_length from dfs.`pcap/tcp-1.pcap`", 1);
}
@Test
@@ -86,7 +87,17 @@ public class TestPcapRecordReader extends BaseTestQuery {
@Test
public void checkFlags() throws Exception {
- runSQLVerifyCount("select tcp_session, tcp_ack, tcp_flags from dfs.`store/pcap/synscan.pcap`", 2011);
+ runSQLVerifyCount("select tcp_session, tcp_ack, tcp_flags from dfs.`pcap/synscan.pcap`", 2011);
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ String path = "pcap/tcp-1.pcap";
+ dirTestWatcher.copyResourceToRoot(Paths.get(path));
+ testPhysicalPlanSubmission(
+ String.format("select * from dfs.`%s`", path),
+ String.format("select * from table(dfs.`%s`(type=>'pcap'))", path)
+ );
}
private void runSQLVerifyCount(String sql, int expectedRowCount) throws Exception {
@@ -106,4 +117,10 @@ public class TestPcapRecordReader extends BaseTestQuery {
}
assertEquals(expectedRowCount, count);
}
+
+ private void testPhysicalPlanSubmission(String...queries) throws Exception {
+ for (String query : queries) {
+ PlanTestBase.testPhysicalPlanExecutionBasedOnQuery(query);
+ }
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
similarity index 95%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
rename to contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
index addc1a1..001576d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.QueryBuilder;
@@ -43,15 +44,15 @@ public class TestSessionizePCAP extends ClusterTest {
public static void setup() throws Exception {
ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
- PcapFormatConfig sampleConfig = new PcapFormatConfig(null, true);
+ PcapFormatConfig sampleConfig = new PcapFormatConfig(null, true, true);
cluster.defineFormat("cp", "pcap", sampleConfig);
- dirTestWatcher.copyResourceToRoot(Paths.get("store/pcap/"));
+ dirTestWatcher.copyResourceToRoot(Paths.get("pcap/"));
}
@Test
public void testSessionizedStarQuery() throws Exception {
- String sql = "SELECT * FROM cp.`/store/pcap/http.pcap`";
- String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/store/pcap/dataFromRemote.txt");
+ String sql = "SELECT * FROM cp.`/pcap/http.pcap`";
+ String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/pcap/dataFromRemote.txt");
QueryBuilder q = client.queryBuilder().sql(sql);
RowSet results = q.rowSet();
@@ -104,9 +105,9 @@ public class TestSessionizePCAP extends ClusterTest {
String sql = "SELECT src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address," +
"session_start_time, session_end_time, session_duration, total_packet_count, data_volume_from_origin, data_volume_from_remote," +
"packet_count_from_origin, packet_count_from_remote, connection_time, tcp_session, is_corrupt, data_from_originator, data_from_remote " +
- "FROM cp.`/store/pcap/http.pcap`";
+ "FROM cp.`/pcap/http.pcap`";
- String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/store/pcap/dataFromRemote.txt");
+ String dataFromRemote = readAFileIntoString(dirTestWatcher.getRootDir().getAbsolutePath() + "/pcap/dataFromRemote.txt");
QueryBuilder q = client.queryBuilder().sql(sql);
RowSet results = q.rowSet();
@@ -156,7 +157,7 @@ public class TestSessionizePCAP extends ClusterTest {
@Test
public void testSerDe() throws Exception {
- String sql = "SELECT COUNT(*) FROM cp.`/store/pcap/http.pcap`";
+ String sql = "SELECT COUNT(*) FROM cp.`/pcap/http.pcap`";
String plan = queryBuilder().sql(sql).explainJson();
long cnt = queryBuilder().physical(plan).singletonLong();
assertEquals("Counts should match", 1L, cnt);
diff --git a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
index b069432..ceb76bf 100644
--- a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.drill.test.QueryBuilder;
import org.apache.drill.test.QueryTestUtil;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -45,6 +46,7 @@ public class TestPcapngRecordReader extends ClusterTest {
public static void setup() throws Exception {
ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
dirTestWatcher.copyResourceToRoot(Paths.get("pcapng/"));
+ dirTestWatcher.copyResourceToRoot(Paths.get("todo/"));
}
@Test
@@ -190,6 +192,20 @@ public class TestPcapngRecordReader extends ClusterTest {
}
@Test
+ @Ignore // todo: infinite loop with current PcapNGReader
+ public void testPcapNG() throws Exception {
+// String sql = "select * from dfs.`todo/dhcp_big_endian.pcapng` limit 1"; // Bad magic number = 000a0a0a
+// String sql = "select * from dfs.`todo/dhcp_little_endian.pcapng` limit 1"; // Bad magic number = 1c0a0a0a
+// String sql = "select * from dfs.`todo/many_interfaces.pcapng` limit 1"; // Bad magic number = ef0a0a0a
+ String sql = "select * from dfs.`todo/mac2.pcap` limit 1"; // Bad magic number = 1c0a0a0a
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ assertEquals(1, sets.rowCount());
+ sets.clear();
+ }
+
+ @Test
public void testGroupBy() throws Exception {
String sql = "select src_ip, count(1), sum(packet_length) from dfs.`pcapng/sniff.pcapng` group by src_ip";
QueryBuilder builder = client.queryBuilder().sql(sql);
@@ -214,4 +230,34 @@ public class TestPcapngRecordReader extends ClusterTest {
String sql = "select * from dfs.`pcapng/drill.pcapng`";
client.queryBuilder().sql(sql).rowSet();
}
-}
\ No newline at end of file
+
+ @Test
+ public void testPcapNGFileWithPcapExt() throws Exception {
+ String sql = "select count(*) from dfs.`pcapng/example.pcap`";
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+
+ assertEquals("Counts should match", 1, cnt);
+ }
+
+ @Test
+ public void testInlineSchema() throws Exception {
+ String sql = "SELECT type, packet_length, `timestamp` FROM table(dfs.`pcapng/sniff.pcapng` " +
+ "(type => 'pcapng', stat => false, sessionizeTCPStreams => true )) where type = 'ARP'";
+ RowSet sets = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("type", MinorType.VARCHAR)
+ .add("packet_length", MinorType.INT)
+ .add("timestamp", MinorType.TIMESTAMP)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow("ARP", 90, Instant.ofEpochMilli(1518010669927L))
+ .addRow("ARP", 90, Instant.ofEpochMilli(1518010671874L))
+ .build();
+
+ assertEquals(2, sets.rowCount());
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+}
diff --git a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
index d1d966e..9971a88 100644
--- a/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
+++ b/contrib/format-pcapng/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.QueryBuilder;
@@ -41,7 +43,7 @@ public class TestPcapngStatRecordReader extends ClusterTest {
@BeforeClass
public static void setup() throws Exception {
ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
- cluster.defineFormat("dfs", "pcapng", new PcapngFormatConfig(null, true));
+ cluster.defineFormat("dfs", "pcapng", new PcapFormatConfig(ImmutableList.of("pcapng"), true, false));
dirTestWatcher.copyResourceToRoot(Paths.get("pcapng/"));
}
@@ -136,4 +138,4 @@ public class TestPcapngStatRecordReader extends ClusterTest {
RowSet expected = new RowSetBuilder(client.allocator(), schema).build();
new RowSetComparison(expected).verifyAndClearAll(sets);
}
-}
\ No newline at end of file
+}
diff --git a/contrib/format-pcapng/src/test/resources/config/oldPcapPlugins.json b/contrib/format-pcapng/src/test/resources/config/oldPcapPlugins.json
new file mode 100644
index 0000000..c8c6645
--- /dev/null
+++ b/contrib/format-pcapng/src/test/resources/config/oldPcapPlugins.json
@@ -0,0 +1,12 @@
+{
+ "type": "file",
+ "formats": {
+ "pcapng": {
+ "type": "pcapng",
+ "stat" : true
+ },
+ "pcap" : {
+ "type" : "pcap"
+ }
+ }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/store/pcap/arpWithNullIP.pcap b/contrib/format-pcapng/src/test/resources/pcap/arpWithNullIP.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/arpWithNullIP.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/arpWithNullIP.pcap
diff --git a/exec/java-exec/src/test/resources/store/pcap/data-1.pcap b/contrib/format-pcapng/src/test/resources/pcap/data-1.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/data-1.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/data-1.pcap
diff --git a/contrib/format-pcapng/src/test/resources/pcap/data-2.pcap b/contrib/format-pcapng/src/test/resources/pcap/data-2.pcap
new file mode 100644
index 0000000..a9a4563
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/pcap/data-2.pcap differ
diff --git a/exec/java-exec/src/test/resources/store/pcap/dataFromRemote.txt b/contrib/format-pcapng/src/test/resources/pcap/dataFromRemote.txt
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/dataFromRemote.txt
rename to contrib/format-pcapng/src/test/resources/pcap/dataFromRemote.txt
diff --git a/exec/java-exec/src/test/resources/store/pcap/http.pcap b/contrib/format-pcapng/src/test/resources/pcap/http.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/http.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/http.pcap
diff --git a/exec/java-exec/src/test/resources/store/pcap/synscan.pcap b/contrib/format-pcapng/src/test/resources/pcap/synscan.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/synscan.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/synscan.pcap
diff --git a/exec/java-exec/src/test/resources/store/pcap/tcp-1.pcap b/contrib/format-pcapng/src/test/resources/pcap/tcp-1.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/tcp-1.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/tcp-1.pcap
diff --git a/exec/java-exec/src/test/resources/store/pcap/tcp-2.pcap b/contrib/format-pcapng/src/test/resources/pcap/tcp-2.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/tcp-2.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/tcp-2.pcap
diff --git a/exec/java-exec/src/test/resources/store/pcap/testv1.pcap b/contrib/format-pcapng/src/test/resources/pcap/testv1.pcap
similarity index 100%
rename from exec/java-exec/src/test/resources/store/pcap/testv1.pcap
rename to contrib/format-pcapng/src/test/resources/pcap/testv1.pcap
diff --git a/contrib/format-pcapng/src/test/resources/pcapng/example.pcap b/contrib/format-pcapng/src/test/resources/pcapng/example.pcap
new file mode 100644
index 0000000..002cb8d
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/pcapng/example.pcap differ
diff --git a/contrib/format-pcapng/src/test/resources/todo/dhcp.pcapng b/contrib/format-pcapng/src/test/resources/todo/dhcp.pcapng
new file mode 100644
index 0000000..d7d2e33
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/dhcp.pcapng differ
diff --git a/contrib/format-pcapng/src/test/resources/todo/dhcp_big_endian.pcapng b/contrib/format-pcapng/src/test/resources/todo/dhcp_big_endian.pcapng
new file mode 100644
index 0000000..894b361
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/dhcp_big_endian.pcapng differ
diff --git a/contrib/format-pcapng/src/test/resources/todo/dhcp_little_endian.pcapng b/contrib/format-pcapng/src/test/resources/todo/dhcp_little_endian.pcapng
new file mode 100644
index 0000000..3378440
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/dhcp_little_endian.pcapng differ
diff --git a/contrib/format-pcapng/src/test/resources/todo/many_interfaces.pcapng b/contrib/format-pcapng/src/test/resources/todo/many_interfaces.pcapng
new file mode 100644
index 0000000..6a8b397
Binary files /dev/null and b/contrib/format-pcapng/src/test/resources/todo/many_interfaces.pcapng differ
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
index 43fa88b..592479e 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
@@ -49,7 +49,8 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
@Test
public void testTableProvider() throws StoreException {
LogicalPlanPersistence lp = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(config);
- PersistentStore<String> hbaseStore = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build());
+ PersistentStore<String> hbaseStore = provider.getOrCreateStore(
+ PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build());
hbaseStore.put("", "v0");
hbaseStore.put("k1", "v1");
hbaseStore.put("k2", "v2");
@@ -66,7 +67,8 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
assertEquals(7, Lists.newArrayList(hbaseStore.getAll()).size());
- PersistentStore<String> hbaseTestStore = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build());
+ PersistentStore<String> hbaseTestStore = provider.getOrCreateStore(
+ PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build());
hbaseTestStore.put("", "v0");
hbaseTestStore.put("k1", "v1");
hbaseTestStore.put("k2", "v2");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
index 842c798..9fe2f94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
@@ -62,7 +62,6 @@ public class ColumnsScanFramework extends FileScanFramework {
/**
* Implementation of the columns array schema negotiator.
*/
-
public static class ColumnsSchemaNegotiatorImpl extends FileSchemaNegotiatorImpl
implements ColumnsSchemaNegotiator {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
index b241528..12e1bd0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.CustomErrorContext;
@@ -188,11 +189,18 @@ public class FileScanFramework extends ManagedScanFramework {
}
public abstract ManagedReader<? extends FileSchemaNegotiator> newReader();
+
+ /**
+ * @return FileScanFramework or empty object in case it is not binded yet with {@link #bind(ManagedScanFramework)}
+ */
+ protected Optional<FileScanFramework> fileFramework() {
+ return Optional.ofNullable(fileFramework);
+ }
}
private ImplicitColumnManager metadataManager;
private DrillFileSystem dfs;
- private final List<FileSplit> spilts = new ArrayList<>();
+ private final List<FileSplit> splits = new ArrayList<>();
private Iterator<FileSplit> splitIter;
private FileSplit currentSplit;
@@ -230,9 +238,9 @@ public class FileScanFramework extends ManagedScanFramework {
Path path = dfs.makeQualified(work.getPath());
paths.add(path);
FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""});
- spilts.add(split);
+ splits.add(split);
}
- splitIter = spilts.iterator();
+ splitIter = splits.iterator();
// Create the metadata manager to handle file metadata columns
// (so-called implicit columns and partition columns.)
@@ -274,4 +282,8 @@ public class FileScanFramework extends ManagedScanFramework {
.build(logger);
}
}
+
+ public DrillFileSystem fileSystem() {
+ return dfs;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 5c290d3..d701da4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -382,8 +382,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
*/
protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable,
- boolean blockSplittable,
- boolean compressible, List<String> extensions, String defaultName) {
+ boolean blockSplittable, boolean compressible, List<String> extensions, String defaultName) {
this.name = name == null ? defaultName : name;
easyConfig = EasyFormatConfig.builder()
.matcher(new BasicFormatMatcher(this, fsConf, extensions, compressible))
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
deleted file mode 100644
index 1312151..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcap;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-
-import java.util.List;
-import java.util.Objects;
-
-@JsonTypeName(PcapFormatPlugin.PLUGIN_NAME)
-public class PcapFormatConfig implements FormatPluginConfig {
- private static final List<String> DEFAULT_EXTNS = ImmutableList.of(PcapFormatPlugin.PLUGIN_NAME);
-
- private final List<String> extensions;
- private final boolean sessionizeTCPStreams;
-
- @JsonCreator
- public PcapFormatConfig(
- @JsonProperty("extensions") List<String> extensions,
- @JsonProperty("sessionizeTCPStreams") Boolean sessionizeTCPStreams) {
- this.extensions = extensions == null ?
- DEFAULT_EXTNS : ImmutableList.copyOf(extensions);
- this.sessionizeTCPStreams = sessionizeTCPStreams == null ? false : sessionizeTCPStreams;
- }
-
- @JsonInclude(JsonInclude.Include.NON_DEFAULT)
- public List<String> getExtensions() {
- return extensions;
- }
-
- @JsonInclude(JsonInclude.Include.NON_DEFAULT)
- public boolean getSessionizeTCPStreams() {
- return sessionizeTCPStreams;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(extensions, sessionizeTCPStreams);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- PcapFormatConfig other = (PcapFormatConfig) obj;
- return Objects.equals(extensions, other.extensions) &&
- Objects.equals(sessionizeTCPStreams, other.sessionizeTCPStreams);
- }
-
- @Override
- public String toString() {
- return new PlanStringBuilder(this)
- .field("extensions", extensions)
- .field("sessionizeTCPStreams", sessionizeTCPStreams)
- .toString();
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
deleted file mode 100644
index 69dbfc5..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcap;
-
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.drill.exec.store.pcap.PcapBatchReader.PcapReaderConfig;
-
-public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> {
-
- public static final String PLUGIN_NAME = "pcap";
-
- private static class PcapReaderFactory extends FileReaderFactory {
-
- private final PcapReaderConfig readerConfig;
- private final int maxRecords;
-
- public PcapReaderFactory(PcapReaderConfig config, int maxRecords) {
- readerConfig = config;
- this.maxRecords = maxRecords;
- }
-
- @Override
- public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new PcapBatchReader(readerConfig, maxRecords);
- }
- }
-
- public PcapFormatPlugin(String name, DrillbitContext context,
- Configuration fsConf, StoragePluginConfig storageConfig,
- PcapFormatConfig formatConfig) {
- super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
- }
-
- private static EasyFormatConfig easyConfig(Configuration fsConf, PcapFormatConfig pluginConfig) {
- return EasyFormatConfig.builder()
- .readable(true)
- .writable(false)
- .blockSplittable(false)
- .compressible(true)
- .supportsProjectPushdown(true)
- .extensions(pluginConfig.getExtensions())
- .fsConf(fsConf)
- .defaultName(PLUGIN_NAME)
- .useEnhancedScan(true)
- .supportsLimitPushdown(true)
- .build();
- }
-
- @Override
- public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) {
- return new PcapBatchReader(new PcapReaderConfig(this), scan.getMaxRecords());
- }
-
- @Override
- protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
- FileScanBuilder builder = new FileScanBuilder();
- builder.setReaderFactory(new PcapReaderFactory(new PcapReaderConfig(this), scan.getMaxRecords()));
- initScanBuilder(builder, scan);
- builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
- return builder;
- }
-}
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 4ee29c2..81f63f2 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -38,9 +38,6 @@
"type" : "json",
"extensions" : [ "json" ]
},
- "pcap" : {
- "type" : "pcap"
- },
"avro" : {
"type" : "avro",
"extensions" : [ "avro" ]
@@ -98,9 +95,6 @@
"type" : "json",
"extensions" : [ "json" ]
},
- "pcap" : {
- "type" : "pcap"
- },
"avro" : {
"type" : "avro"
},
@@ -135,9 +129,6 @@
"type" : "json",
"extensions" : [ "json" ]
},
- "pcap" : {
- "type" : "pcap"
- },
"parquet" : {
"type" : "parquet"
},
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
index a55c7c3..1ceb91e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java
@@ -17,6 +17,10 @@
*/
package org.apache.drill.exec;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.drill.common.config.DrillConfig;
import org.junit.After;
import org.junit.Before;
@@ -35,4 +39,17 @@ public class TestWithZookeeper extends ExecTest {
public void tearDown() throws Exception {
zkHelper.stopZookeeper();
}
+
+ protected CuratorFramework createCurator() {
+ String connect = zkHelper.getConnectionString();
+ DrillConfig config = zkHelper.getConfig();
+
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+ .namespace(zkHelper.getConfig().getString(ExecConstants.ZK_ROOT))
+ .retryPolicy(new RetryNTimes(1, 100))
+ .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
+ .connectString(connect);
+
+ return builder.build();
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
index 6001773..f8dee17 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
@@ -82,16 +82,6 @@ public class FormatPluginSerDeTest extends PlanTestBase {
}
@Test
- public void testPcap() throws Exception {
- String path = "store/pcap/tcp-1.pcap";
- dirTestWatcher.copyResourceToRoot(Paths.get(path));
- testPhysicalPlanSubmission(
- String.format("select * from dfs.`%s`", path),
- String.format("select * from table(dfs.`%s`(type=>'pcap'))", path)
- );
- }
-
- @Test
public void testJson() throws Exception {
testPhysicalPlanSubmission(
"select * from cp.`donuts.json`",
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
index efc8a7b..cab9877 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.store.sys;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.DrillFileUtils;
@@ -144,17 +142,4 @@ public class TestPStoreProviders extends TestWithZookeeper {
}
}
}
-
- private CuratorFramework createCurator() {
- String connect = zkHelper.getConnectionString();
- DrillConfig config = zkHelper.getConfig();
-
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
- .namespace(zkHelper.getConfig().getString(ExecConstants.ZK_ROOT))
- .retryPolicy(new RetryNTimes(1, 100))
- .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
- .connectString(connect);
-
- return builder.build();
- }
}
diff --git a/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json b/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json
index fd86af2..e8709bc 100644
--- a/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json
+++ b/exec/java-exec/src/test/resources/plugins/mock-plugin-upgrade.json
@@ -33,9 +33,6 @@
"type" : "json",
"extensions" : [ "json" ]
},
- "pcap" : {
- "type" : "pcap"
- },
"avro" : {
"type" : "avro",
"extensions" : [ "avro" ]
@@ -147,9 +144,6 @@
"type" : "json",
"extensions" : [ "json" ]
},
- "pcap" : {
- "type" : "pcap"
- },
"avro" : {
"type" : "avro",
"extensions" : [ "avro" ]
diff --git a/exec/java-exec/src/test/resources/store/pcap/data-2.pcap b/exec/java-exec/src/test/resources/store/pcap/data-2.pcap
deleted file mode 100644
index 1f23ee8..0000000
Binary files a/exec/java-exec/src/test/resources/store/pcap/data-2.pcap and /dev/null differ