You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/11/26 13:33:07 UTC
[drill] 01/02: DRILL-7443: Enable PCAP Plugin to Reassemble TCP
Streams
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit bd549b9cbfa538038ec440aa8c623b3bdb40d50b
Author: Charles Givre <cg...@apache.org>
AuthorDate: Fri Nov 22 16:04:29 2019 -0500
DRILL-7443: Enable PCAP Plugin to Reassemble TCP Streams
closes #1898
---
.../drill/exec/store/pcap/PcapBatchReader.java | 203 ++++++++--
.../drill/exec/store/pcap/PcapFormatConfig.java | 8 +-
.../drill/exec/store/pcap/PcapRecordReader.java | 418 ---------------------
.../drill/exec/store/pcap/decoder/Packet.java | 49 ++-
.../exec/store/pcap/decoder/TcpHandshake.java | 144 +++++++
.../drill/exec/store/pcap/decoder/TcpSession.java | 321 ++++++++++++++++
.../drill/exec/store/pcap/schema/PcapTypes.java | 5 +-
.../drill/exec/store/pcap/schema/Schema.java | 72 ++--
.../store/dfs/TestFormatPluginOptionExtractor.java | 4 +-
.../drill/exec/store/pcap/TestPcapDecoder.java | 8 +-
.../drill/exec/store/pcap/TestPcapEVFReader.java | 1 -
.../drill/exec/store/pcap/TestSessionizePCAP.java | 107 ++++++
.../test/resources/store/pcap/attack-trace.pcap | Bin 0 -> 189103 bytes
13 files changed, 850 insertions(+), 490 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
index c9caa24..5e4a46c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.pcap.decoder.Packet;
import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
+import org.apache.drill.exec.store.pcap.decoder.TcpSession;
import org.apache.drill.exec.store.pcap.schema.Schema;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.hadoop.mapred.FileSplit;
@@ -35,12 +36,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
- public static final int BUFFER_SIZE = 500_000;
+ protected static final int BUFFER_SIZE = 500_000;
private static final Logger logger = LoggerFactory.getLogger(PcapBatchReader.class);
@@ -116,17 +119,56 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
private ScalarWriter isCorruptWriter;
+ private PcapReaderConfig readerConfig;
+
+
+ // Writers for TCP Sessions
+ private ScalarWriter sessionStartTimeWriter;
+
+ private ScalarWriter sessionEndTimeWriter;
+
+ private ScalarWriter sessionDurationWriter;
+
+ private ScalarWriter connectionTimeWriter;
+
+ private ScalarWriter packetCountWriter;
+
+ private ScalarWriter originPacketCounterWriter;
+
+ private ScalarWriter remotePacketCounterWriter;
+
+ private ScalarWriter originDataVolumeWriter;
+
+ private ScalarWriter remoteDataVolumeWriter;
+
+ private ScalarWriter hostDataWriter;
+
+ private ScalarWriter remoteDataWriter;
+
+
+ private Map<Long, TcpSession> sessionQueue;
+
public static class PcapReaderConfig {
protected final PcapFormatPlugin plugin;
+ public boolean sessionizeTCPStreams;
+
+ private PcapFormatConfig config;
+
public PcapReaderConfig(PcapFormatPlugin plugin) {
this.plugin = plugin;
+ this.config = plugin.getConfig();
+ this.sessionizeTCPStreams = config.sessionizeTCPStreams;
}
}
public PcapBatchReader(PcapReaderConfig readerConfig) {
+ this.readerConfig = readerConfig;
+ if (readerConfig.sessionizeTCPStreams) {
+ sessionQueue = new HashMap<>();
+ }
}
@Override
@@ -134,46 +176,14 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
split = negotiator.split();
openFile(negotiator);
SchemaBuilder builder = new SchemaBuilder();
- Schema pcapSchema = new Schema();
+ Schema pcapSchema = new Schema(readerConfig.sessionizeTCPStreams);
TupleMetadata schema = pcapSchema.buildSchema(builder);
negotiator.setTableSchema(schema, false);
ResultSetLoader loader = negotiator.build();
// Creates writers for all fields (Since schema is known)
rowWriter = loader.writer();
- typeWriter = rowWriter.scalar("type");
- timestampWriter = rowWriter.scalar("packet_timestamp");
- timestampMicroWriter = rowWriter.scalar("timestamp_micro");
- networkWriter = rowWriter.scalar("network");
- srcMacAddressWriter = rowWriter.scalar("src_mac_address");
- dstMacAddressWriter = rowWriter.scalar("dst_mac_address");
- dstIPWriter = rowWriter.scalar("dst_ip");
- srcIPWriter = rowWriter.scalar("src_ip");
- srcPortWriter = rowWriter.scalar("src_port");
- dstPortWriter = rowWriter.scalar("dst_port");
- packetLengthWriter = rowWriter.scalar("packet_length");
-
- //Writers for TCP Packets
- tcpSessionWriter = rowWriter.scalar("tcp_session");
- tcpSequenceWriter = rowWriter.scalar("tcp_sequence");
- tcpAckNumberWriter = rowWriter.scalar("tcp_ack");
- tcpFlagsWriter = rowWriter.scalar("tcp_flags");
- tcpParsedFlagsWriter = rowWriter.scalar("tcp_parsed_flags");
- tcpNsWriter = rowWriter.scalar("tcp_flags_ns");
- tcpCwrWriter = rowWriter.scalar("tcp_flags_cwr");
- tcpEceWriter = rowWriter.scalar("tcp_flags_ece");
- tcpFlagsEceEcnCapableWriter = rowWriter.scalar("tcp_flags_ece_ecn_capable");
- tcpFlagsCongestionWriter = rowWriter.scalar("tcp_flags_ece_congestion_experienced");
-
- tcpUrgWriter = rowWriter.scalar("tcp_flags_urg");
- tcpAckWriter = rowWriter.scalar("tcp_flags_ack");
- tcpPshWriter = rowWriter.scalar("tcp_flags_psh");
- tcpRstWriter = rowWriter.scalar("tcp_flags_rst");
- tcpSynWriter = rowWriter.scalar("tcp_flags_syn");
- tcpFinWriter = rowWriter.scalar("tcp_flags_fin");
-
- dataWriter = rowWriter.scalar("data");
- isCorruptWriter = rowWriter.scalar("is_corrupt");
+ populateColumnWriters(rowWriter);
return true;
}
@@ -190,6 +200,14 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
@Override
public void close() {
+
+ /* This warning could occur in the event of a corrupt or incomplete PCAP file. Specifically,
+ * if a session is started in one file and the end of the session is not captured in the same file.
+ */
+ if (sessionQueue != null && !sessionQueue.isEmpty()) {
+ logger.warn("Unclosed sessions remaining in PCAP");
+ }
+
try {
fsStream.close();
} catch (IOException e) {
@@ -217,7 +235,69 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
}
+ private void populateColumnWriters(RowSetLoader rowWriter) {
+ if (readerConfig.sessionizeTCPStreams) {
+ srcMacAddressWriter = rowWriter.scalar("src_mac_address");
+ dstMacAddressWriter = rowWriter.scalar("dst_mac_address");
+ dstIPWriter = rowWriter.scalar("dst_ip");
+ srcIPWriter = rowWriter.scalar("src_ip");
+ srcPortWriter = rowWriter.scalar("src_port");
+ dstPortWriter = rowWriter.scalar("dst_port");
+ sessionStartTimeWriter = rowWriter.scalar("session_start_time");
+ sessionEndTimeWriter = rowWriter.scalar("session_end_time");
+ sessionDurationWriter = rowWriter.scalar("session_duration");
+ connectionTimeWriter = rowWriter.scalar("connection_time");
+ tcpSessionWriter = rowWriter.scalar("tcp_session");
+ packetCountWriter = rowWriter.scalar("total_packet_count");
+ hostDataWriter = rowWriter.scalar("data_from_originator");
+ remoteDataWriter = rowWriter.scalar("data_from_remote");
+
+ originPacketCounterWriter = rowWriter.scalar("packet_count_from_origin");
+ remotePacketCounterWriter = rowWriter.scalar("packet_count_from_remote");
+ originDataVolumeWriter = rowWriter.scalar("data_volume_from_origin");
+ remoteDataVolumeWriter = rowWriter.scalar("data_volume_from_remote");
+ isCorruptWriter = rowWriter.scalar("is_corrupt");
+
+ } else {
+ typeWriter = rowWriter.scalar("type");
+ timestampWriter = rowWriter.scalar("packet_timestamp");
+ timestampMicroWriter = rowWriter.scalar("timestamp_micro");
+ networkWriter = rowWriter.scalar("network");
+ srcMacAddressWriter = rowWriter.scalar("src_mac_address");
+ dstMacAddressWriter = rowWriter.scalar("dst_mac_address");
+ dstIPWriter = rowWriter.scalar("dst_ip");
+ srcIPWriter = rowWriter.scalar("src_ip");
+ srcPortWriter = rowWriter.scalar("src_port");
+ dstPortWriter = rowWriter.scalar("dst_port");
+ packetLengthWriter = rowWriter.scalar("packet_length");
+
+ //Writers for TCP Packets
+ tcpSessionWriter = rowWriter.scalar("tcp_session");
+ tcpSequenceWriter = rowWriter.scalar("tcp_sequence");
+ tcpAckNumberWriter = rowWriter.scalar("tcp_ack");
+ tcpFlagsWriter = rowWriter.scalar("tcp_flags");
+ tcpParsedFlagsWriter = rowWriter.scalar("tcp_parsed_flags");
+ tcpNsWriter = rowWriter.scalar("tcp_flags_ns");
+ tcpCwrWriter = rowWriter.scalar("tcp_flags_cwr");
+ tcpEceWriter = rowWriter.scalar("tcp_flags_ece");
+ tcpFlagsEceEcnCapableWriter = rowWriter.scalar("tcp_flags_ece_ecn_capable");
+ tcpFlagsCongestionWriter = rowWriter.scalar("tcp_flags_ece_congestion_experienced");
+
+ tcpUrgWriter = rowWriter.scalar("tcp_flags_urg");
+ tcpAckWriter = rowWriter.scalar("tcp_flags_ack");
+ tcpPshWriter = rowWriter.scalar("tcp_flags_psh");
+ tcpRstWriter = rowWriter.scalar("tcp_flags_rst");
+ tcpSynWriter = rowWriter.scalar("tcp_flags_syn");
+ tcpFinWriter = rowWriter.scalar("tcp_flags_fin");
+
+ dataWriter = rowWriter.scalar("data");
+ isCorruptWriter = rowWriter.scalar("is_corrupt");
+ }
+ }
+
private boolean parseNextPacket(RowSetLoader rowWriter) {
+
+ // Decode the packet
Packet packet = new Packet();
if (offset >= validBytes) {
@@ -234,8 +314,28 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
packet.setIsCorrupt(true);
logger.debug("Invalid packet at offset {}", old);
}
- addDataToTable(packet, decoder.getNetwork(), rowWriter);
+ // If we are resessionizing the TCP Stream, add the packet to the stream
+ if (readerConfig.sessionizeTCPStreams) {
+ // If the session has not been seen before, add it to the queue
+ long sessionID = packet.getSessionHash();
+ if (!sessionQueue.containsKey(sessionID)) {
+ logger.debug("Adding session {} to session queue.", sessionID);
+ sessionQueue.put(sessionID, new TcpSession(sessionID));
+ }
+
+ // When the session is closed, write it and remove it from the session queue.
+ sessionQueue.get(sessionID).addPacket(packet);
+ if (sessionQueue.get(sessionID).connectionClosed()) {
+ // Write out the session
+ addSessionDataToTable(sessionQueue.get(sessionID), rowWriter);
+ // Remove from the queue
+ sessionQueue.remove(sessionID);
+ }
+
+ } else {
+ addDataToTable(packet, decoder.getNetwork(), rowWriter);
+ }
return true;
}
@@ -268,7 +368,35 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
return true;
}
- private boolean addDataToTable(Packet packet, int networkType, RowSetLoader rowWriter) {
+ private void addSessionDataToTable(TcpSession session, RowSetLoader rowWriter) {
+ rowWriter.start();
+
+ sessionStartTimeWriter.setTimestamp(session.getSessionStartTime());
+ sessionEndTimeWriter.setTimestamp(session.getSessionEndTime());
+ sessionDurationWriter.setPeriod(session.getSessionDuration());
+ connectionTimeWriter.setPeriod(session.getConnectionTime());
+
+ srcMacAddressWriter.setString(session.getSrcMac());
+ dstMacAddressWriter.setString(session.getDstMac());
+ srcIPWriter.setString(session.getSrcIP());
+ dstIPWriter.setString(session.getDstIP());
+ srcPortWriter.setInt(session.getSrcPort());
+ dstPortWriter.setInt(session.getDstPort());
+ tcpSessionWriter.setLong(session.getSessionID());
+ packetCountWriter.setInt(session.getPacketCount());
+
+ originPacketCounterWriter.setInt(session.getPacketCountFromOrigin());
+ remotePacketCounterWriter.setInt(session.getPacketCountFromRemote());
+ originDataVolumeWriter.setInt(session.getDataFromOriginator().length);
+ remoteDataVolumeWriter.setInt(session.getDataFromRemote().length);
+ isCorruptWriter.setBoolean(session.hasCorruptedData());
+
+ hostDataWriter.setString(session.getDataFromOriginatorAsString());
+ remoteDataWriter.setString(session.getDataFromRemoteAsString());
+ rowWriter.save();
+ }
+
+ private void addDataToTable(Packet packet, int networkType, RowSetLoader rowWriter) {
rowWriter.start();
typeWriter.setString(packet.getPacketType());
@@ -312,6 +440,5 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
// TODO Parse Data Packet Here:
// Description of work in
// DRILL-7400: Add Packet Decoders with Interface to Drill
- return true;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
index c06ddf9..133b5d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java
@@ -34,13 +34,16 @@ public class PcapFormatConfig implements FormatPluginConfig {
public List<String> extensions;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public boolean sessionizeTCPStreams = false;
+
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
public List<String> getExtensions() {
return extensions == null ? DEFAULT_EXTS : extensions;
}
@Override
public int hashCode() {
- return Arrays.hashCode(new Object[]{extensions});
+ return Arrays.hashCode(new Object[]{extensions, sessionizeTCPStreams});
}
@Override
@@ -52,6 +55,7 @@ public class PcapFormatConfig implements FormatPluginConfig {
return false;
}
PcapFormatConfig other = (PcapFormatConfig) obj;
- return Objects.equal(extensions, other.extensions);
+ return Objects.equal(extensions, other.extensions)
+ && Objects.equal(sessionizeTCPStreams, other.sessionizeTCPStreams);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
deleted file mode 100644
index f9b8a72..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcap;
-
-import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.pcap.decoder.Packet;
-import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
-import org.apache.drill.exec.store.pcap.dto.ColumnDto;
-import org.apache.drill.exec.store.pcap.schema.PcapTypes;
-import org.apache.drill.exec.store.pcap.schema.Schema;
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableTimeStampVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
-
-public class PcapRecordReader extends AbstractRecordReader {
- static final int BUFFER_SIZE = 500_000; // this should be relatively large relative to max packet
-
- private static final Logger logger = LoggerFactory.getLogger(PcapRecordReader.class);
- private static final int BATCH_SIZE = 40_000;
-
- private OutputMutator output;
-
- private PacketDecoder decoder;
- private ImmutableList<ProjectedColumnInfo> projectedCols;
- private FileSystem fs;
-
- private byte[] buffer;
- private int offset = 0;
- private FSDataInputStream in;
- private int validBytes;
-
- private final Path pathToFile;
- private List<SchemaPath> projectedColumns;
-
- private static final Map<PcapTypes, MinorType> TYPES;
-
- private static class ProjectedColumnInfo {
- ValueVector vv;
- ColumnDto pcapColumn;
- }
-
- static {
- TYPES = ImmutableMap.<PcapTypes, TypeProtos.MinorType>builder()
- .put(PcapTypes.STRING, MinorType.VARCHAR)
- .put(PcapTypes.INTEGER, MinorType.INT)
- .put(PcapTypes.LONG, MinorType.BIGINT)
- .put(PcapTypes.TIMESTAMP, MinorType.TIMESTAMP)
- .put(PcapTypes.BOOLEAN, MinorType.BIT)
- .build();
- }
-
- public PcapRecordReader(final Path pathToFile,
- final FileSystem fileSystem,
- final List<SchemaPath> projectedColumns) {
- this.fs = fileSystem;
- this.pathToFile = fs.makeQualified(pathToFile);
- this.projectedColumns = projectedColumns;
- }
-
- @Override
- public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
- try {
- this.output = output;
- this.in = fs.open(pathToFile);
- this.decoder = new PacketDecoder(in);
- this.buffer = new byte[BUFFER_SIZE + decoder.getMaxLength()];
- this.validBytes = in.read(buffer);
- this.projectedCols = getProjectedColsIfItNull();
- setColumns(projectedColumns);
- } catch (IOException io) {
- throw UserException.dataReadError(io)
- .addContext("File name:", pathToFile.toUri().getPath())
- .build(logger);
- }
- }
-
- @Override
- public int next() {
- try {
- return parsePcapFilesAndPutItToTable();
- } catch (IOException io) {
- throw UserException.dataReadError(io)
- .addContext("Trouble with reading packets in file!")
- .build(logger);
- }
- }
-
- @Override
- public void close() throws Exception {
- in.close();
- }
-
- private ImmutableList<ProjectedColumnInfo> getProjectedColsIfItNull() {
- return projectedCols != null ? projectedCols : initCols(new Schema());
- }
-
- private ImmutableList<ProjectedColumnInfo> initCols(final Schema schema) {
- ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
- ColumnDto column;
-
- for (int i = 0; i < schema.getNumberOfColumns(); i++) {
- column = schema.getColumnByIndex(i);
-
- final String name = column.getColumnName().toLowerCase();
- final PcapTypes type = column.getColumnType();
- TypeProtos.MinorType minorType = TYPES.get(type);
-
- ProjectedColumnInfo pci = getProjectedColumnInfo(column, name, minorType);
- pciBuilder.add(pci);
- }
- return pciBuilder.build();
- }
-
- private ProjectedColumnInfo getProjectedColumnInfo(final ColumnDto column,
- final String name,
- final MinorType minorType) {
- TypeProtos.MajorType majorType = getMajorType(minorType);
-
- MaterializedField field =
- MaterializedField.create(name, majorType);
-
- ValueVector vector =
- getValueVector(minorType, majorType, field);
-
- return getProjectedColumnInfo(column, vector);
- }
-
- private ProjectedColumnInfo getProjectedColumnInfo(final ColumnDto column, final ValueVector vector) {
- ProjectedColumnInfo pci = new ProjectedColumnInfo();
- pci.vv = vector;
- pci.pcapColumn = column;
- return pci;
- }
-
- private MajorType getMajorType(final MinorType minorType) {
- return Types.optional(minorType);
- }
-
- private ValueVector getValueVector(final MinorType minorType,
- final MajorType majorType,
- final MaterializedField field) {
- try {
-
- final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
- minorType, majorType.getMode());
- ValueVector vector = output.addField(field, clazz);
- vector.allocateNew();
- return vector;
-
- } catch (SchemaChangeException sce) {
- throw new IllegalStateException("The addition of this field is incompatible with this OutputMutator's capabilities");
- }
- }
-
- private int parsePcapFilesAndPutItToTable() throws IOException {
- Packet packet = new Packet();
- int counter = 0;
- while (offset < validBytes && counter < BATCH_SIZE) {
- if (validBytes - offset < decoder.getMaxLength()) {
- if (validBytes == buffer.length) {
- // shift data and read more. This is the common case.
- System.arraycopy(buffer, offset, buffer, 0, validBytes - offset);
- validBytes = validBytes - offset;
- offset = 0;
-
- int n = in.read(buffer, validBytes, buffer.length - validBytes);
- if (n > 0) {
- validBytes += n;
- }
- logger.info("read {} bytes, at {} offset", n, validBytes);
- } else {
- // near the end of the file, we will just top up the buffer without shifting
- int n = in.read(buffer, offset, buffer.length - offset);
- if (n > 0) {
- validBytes = validBytes + n;
- logger.info("Topped up buffer with {} bytes to yield {}\n", n, validBytes);
- }
- }
- }
- int old = offset;
- offset = decoder.decodePacket(buffer, offset, packet, decoder.getMaxLength(), validBytes);
- if (offset > validBytes) {
- //Start here...
- logger.error("Invalid packet at offset {}", old);
- }
-
- if (addDataToTable(packet, decoder.getNetwork(), counter)) {
- counter++;
- }
- }
- return counter;
- }
-
- private boolean addDataToTable(final Packet packet, final int networkType, final int count) {
- for (ProjectedColumnInfo pci : projectedCols) {
- switch (pci.pcapColumn.getColumnName()) {
- case "type":
- setStringColumnValue(packet.getPacketType(), pci, count);
- break;
- case "timestamp":
- setTimestampColumnValue(packet.getTimestamp(), pci, count);
- break;
- case "timestamp_micro":
- setLongColumnValue(packet.getTimestampMicro(), pci, count);
- break;
- case "network":
- setIntegerColumnValue(networkType, pci, count);
- break;
- case "src_mac_address":
- setStringColumnValue(packet.getEthernetSource(), pci, count);
- break;
- case "dst_mac_address":
- setStringColumnValue(packet.getEthernetDestination(), pci, count);
- break;
- case "dst_ip":
- if (packet.getDst_ip() != null) {
- setStringColumnValue(packet.getDst_ip().getHostAddress(), pci, count);
- } else {
- setStringColumnValue(null, pci, count);
- }
- break;
- case "src_ip":
- if (packet.getSrc_ip() != null) {
- setStringColumnValue(packet.getSrc_ip().getHostAddress(), pci, count);
- } else {
- setStringColumnValue(null, pci, count);
- }
- break;
- case "src_port":
- setIntegerColumnValue(packet.getSrc_port(), pci, count);
- break;
- case "dst_port":
- setIntegerColumnValue(packet.getDst_port(), pci, count);
- break;
- case "tcp_session":
- if (packet.isTcpPacket()) {
- setLongColumnValue(packet.getSessionHash(), pci, count);
- }
- break;
- case "tcp_sequence":
- if (packet.isTcpPacket()) {
- setIntegerColumnValue(packet.getSequenceNumber(), pci, count);
- }
- break;
- case "tcp_ack":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue(packet.getAckNumber(), pci, count);
- }
- break;
- case "tcp_flags":
- if (packet.isTcpPacket()) {
- setIntegerColumnValue(packet.getFlags(), pci, count);
- }
- break;
- case "tcp_parsed_flags":
- if (packet.isTcpPacket()) {
- setStringColumnValue(packet.getParsedFlags(), pci, count);
- }
- break;
- case "tcp_flags_ns":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue((packet.getFlags() & 0x100) != 0, pci, count);
- }
- break;
- case "tcp_flags_cwr":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue((packet.getFlags() & 0x80) != 0, pci, count);
- }
- break;
- case "tcp_flags_ece ":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue((packet.getFlags() & 0x40) != 0, pci, count);
- }
- break;
- case "tcp_flags_ece_ecn_capable":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue((packet.getFlags() & 0x42) == 0x42, pci, count);
- }
- break;
- case "tcp_flags_ece_congestion_experienced":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue((packet.getFlags() & 0x42) == 0x40, pci, count);
- }
- break;
- case "tcp_flags_urg":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue((packet.getFlags() & 0x20) != 0, pci, count);
- }
- break;
- case "tcp_flags_ack":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue((packet.getFlags() & 0x10) != 0, pci, count);
- }
- break;
- case "tcp_flags_psh":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue((packet.getFlags() & 0x8) != 0, pci, count);
- }
- break;
- case "tcp_flags_rst":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue((packet.getFlags() & 0x4) != 0, pci, count);
- }
- break;
- case "tcp_flags_syn":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue((packet.getFlags() & 0x2) != 0, pci, count);
- }
- break;
- case "tcp_flags_fin":
- if (packet.isTcpPacket()) {
- setBooleanColumnValue((packet.getFlags() & 0x1) != 0, pci, count);
- }
- break;
- case "packet_length":
- setIntegerColumnValue(packet.getPacketLength(), pci, count);
- break;
- case "is_corrupt":
- setBooleanColumnValue(packet.isCorrupt(), pci, count);
- break;
- case "data":
- if (packet.getData() != null) {
- setStringColumnValue(parseBytesToASCII(packet.getData()), pci, count);
- } else {
- setStringColumnValue("[]", pci, count);
- }
- break;
- }
- }
- return true;
- }
-
- private void setLongColumnValue(long data, ProjectedColumnInfo pci, final int count) {
- ((NullableBigIntVector.Mutator) pci.vv.getMutator())
- .setSafe(count, data);
- }
-
- private void setIntegerColumnValue(final int data, final ProjectedColumnInfo pci, final int count) {
- ((NullableIntVector.Mutator) pci.vv.getMutator())
- .setSafe(count, data);
- }
-
- private void setBooleanColumnValue(final boolean data, final ProjectedColumnInfo pci, final int count) {
- ((NullableBitVector.Mutator) pci.vv.getMutator())
- .setSafe(count, data ? 1 : 0);
- }
-
- private void setBooleanColumnValue(final int data, final ProjectedColumnInfo pci, final int count) {
- ((NullableBitVector.Mutator) pci.vv.getMutator())
- .setSafe(count, data);
- }
-
- private void setTimestampColumnValue(final long data, final ProjectedColumnInfo pci, final int count) {
- ((NullableTimeStampVector.Mutator) pci.vv.getMutator())
- .setSafe(count, data);
- }
-
- private void setStringColumnValue(final String data, final ProjectedColumnInfo pci, final int count) {
- if (data == null) {
- ((NullableVarCharVector.Mutator) pci.vv.getMutator())
- .setNull(count);
- } else {
- ByteBuffer value = ByteBuffer.wrap(data.getBytes(UTF_8));
- ((NullableVarCharVector.Mutator) pci.vv.getMutator())
- .setSafe(count, value, 0, value.remaining());
- }
- }
-
- @Override
- public String toString() {
- return "PcapRecordReader[File=" + pathToFile.toUri() + "]";
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
index 803a779..7d49699 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
@@ -33,7 +33,7 @@ import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getByte;
import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getIntFileOrder;
import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getShort;
-public class Packet {
+public class Packet implements Comparable<Packet> {
// pcap header
// typedef struct pcaprec_hdr_s {
// guint32 ts_sec; // timestamp seconds
@@ -223,6 +223,42 @@ public class Packet {
isCorrupt = value;
}
+ public boolean getUrgFlag() {
+ return (getFlags() & 0x20) != 0;
+ }
+
+ public boolean getPshFlag() {
+ return (getFlags() & 0x8) != 0;
+ }
+
+ public boolean getEceFlag() {
+ return (getFlags() & 0x40) != 0;
+ }
+
+ public boolean getSynFlag() {
+ return (getFlags() & 0x2) != 0;
+ }
+
+ public boolean getAckFlag() {
+ return (getFlags() & 0x10) != 0;
+ }
+
+ public boolean getRstFlag() {
+ return (getFlags() & 0x4) != 0;
+ }
+
+ public boolean getFinFlag() {
+ return (getFlags() & 0x1) != 0;
+ }
+
+ public boolean getNSFlag() {
+ return (getFlags() & 0x100) != 0;
+ }
+
+ public boolean getCwrFlag() {
+ return (getFlags() & 0x80) != 0;
+ }
+
public static String formatFlags(int flags) {
int mask = 0x100;
StringBuilder r = new StringBuilder();
@@ -491,4 +527,15 @@ public class Packet {
int dstPortOffset = ipOffset + getIPHeaderLength() + offset;
return convertShort(raw, dstPortOffset);
}
+
+ /**
+ * This function is here so that packets can be sorted for re-sessionization. Packets in TCP streams
+ * are ordered by the sequence number, so being able to order the packets is necessary to reassemble the
+ * TCP session.
+ * @param o The packet to which the current packet is compared to.
+ * @return Returns the difference in sequence number.
+ */
+ public int compareTo(Packet o) {
+ return this.getSequenceNumber() - (o).getSequenceNumber();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java
new file mode 100644
index 0000000..2fc3d05
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pcap.decoder;
+
+/**
+ * This class is used to record the status of the TCP Handshake. Initially this is used just to determine whether a session is open or closed, but
+ * future functionality could include SYN flood identification, or other hackery with TCP flags.
+ */
+public class TcpHandshake {
+ private boolean isConnected = false;
+
+ private State currentSessionState = State.NONE;
+
+ private long sessionID;
+
+ /**
+ * Returns true for a correct TCP handshake: SYN|SYNACK|ACK, False if not.
+ *
+ * @return boolean true if the session is open, false if not.
+ */
+ public boolean isConnected() {
+ return isConnected;
+ }
+
+ /**
+ * This method returns true if the session is closed properly via FIN -> FIN ACK, false if not.
+ *
+ * @return boolean true if the session is closed, false if not.
+ */
+ public boolean isClosed() {
+ if (currentSessionState == State.CLOSE_WAIT ||
+ currentSessionState == State.FORCED_CLOSED ||
+ currentSessionState == State.CLOSED ||
+ currentSessionState == State.TIME_WAIT ||
+ currentSessionState == State.FIN_WAIT) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public State getCurrentSessionState() {
+ return currentSessionState;
+ }
+
+ public void setConnected(long sessionID) {
+ this.sessionID = sessionID;
+ currentSessionState = State.OPEN;
+ isConnected = true;
+ }
+
+ public void setRst() {
+ isConnected = false;
+ currentSessionState = State.FORCED_CLOSED;
+ }
+
+ public void setFin() {
+ if (currentSessionState == State.OPEN) {
+ currentSessionState = State.CLOSE_WAIT; // The next packet should be another FIN packet
+ } else if (currentSessionState == State.CLOSE_WAIT) {
+ currentSessionState = State.TIME_WAIT;
+ }
+ }
+
+ public void setAck() {
+ if (currentSessionState == State.SYN) {
+ currentSessionState = State.SYNACK;
+ } else if (currentSessionState == State.SYNACK) {
+ currentSessionState = State.OPEN;
+ isConnected = true;
+ } else if (currentSessionState == State.CLOSE_WAIT) {
+ currentSessionState = State.FIN_WAIT;
+ }
+ }
+
+ public void setSyn() {
+ if (currentSessionState == State.NONE) {
+ currentSessionState = State.SYN;
+ }
+ }
+
+ /**
+ * This enum variable represents the various states of the TCP Handshake
+ */
+ enum State {
+ /**
+ * The NONE state is the initialization state. No session has be established
+ */
+ NONE,
+ /**
+ * The OPEN state represents a successfully opened TCP session. It is established in the final step in the TCP
+ * handshake.
+ */
+ OPEN,
+ /**
+ * The CLOSED session represents a closed TCP session. This state occurs after the final ACK of the 4 way close process
+ */
+ CLOSED,
+ /**
+ * The CLOSE_WAIT state represents a session in which one party has sent a frame with a FIN flag set.
+ * At this point resources can be released, however, to fully close the session the other party needs to send a frame
+ * with an ACK packet.
+ */
+ CLOSE_WAIT,
+ /**
+ * This state occurs after receiving the first FIN/ACK frame. The recipient will then follow with a FIN frame, closing the session.
+ */
+ TIME_WAIT,
+ /**
+ * The SYN state represents the first step in the TCP handshake. The originator has sent a frame with the SYN flag set.
+ * The next step would be the SYN/ACK stage.
+ */
+ SYN,
+ /**
+ * This step represents the second step of the TCP handshake in which the recipient acknowledges the originator's SYN
+ * flag.
+ */
+ SYNACK,
+ /**
+ * The FORCED_CLOSED state represents a session which was closed forcefully by a RST frame.
+ */
+ FORCED_CLOSED,
+ /**
+ * The FIN_WAIT state occurs after receiving the initial FIN flag.
+ */
+ FIN_WAIT
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java
new file mode 100644
index 0000000..5900fd6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pcap.decoder;
+
+import org.joda.time.Instant;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
+
+/**
+ * This class is the representation of a TCP session.
+ */
+public class TcpSession {
+
+ private final List<Packet> packetsFromSender;
+ private final List<Packet> packetsFromReceiver;
+ private final TcpHandshake handshake;
+ private final long sessionID;
+
+ private long startTime;
+ private long endTime;
+ private int packetCount;
+ private InetAddress srcIP;
+ private InetAddress dstIP;
+ private int srcPort;
+ private int dstPort;
+ private String srcMac;
+ private String dstMac;
+ private long synTime;
+ private long ackTime;
+ private long connectTime;
+ private byte[] sentData;
+ private byte[] receivedData;
+ private int sentDataSize;
+ private int receivedDataSize;
+ private boolean hasCorruptedData = false;
+
+
+ private static final Logger logger = LoggerFactory.getLogger(TcpSession.class);
+
+ public TcpSession (long sessionID) {
+ packetsFromSender = new ArrayList<>();
+ packetsFromReceiver = new ArrayList<>();
+
+ handshake = new TcpHandshake();
+ this.sessionID = sessionID;
+ }
+
+ /**
+ * This function adds a packet to the TCP session.
+ * @param p The Packet to be added to the session
+ */
+ public void addPacket(Packet p) {
+
+ // Only attempt to add TCP packets to session
+ if (!p.getPacketType().equalsIgnoreCase("TCP")) {
+ return;
+ }
+
+ // These variables should be consistent within a TCP session
+ if (packetCount == 0) {
+ srcIP = p.getSrc_ip();
+ dstIP = p.getDst_ip();
+
+ srcPort = p.getSrc_port();
+ dstPort = p.getDst_port();
+
+ srcMac = p.getEthernetSource();
+ dstMac = p.getEthernetDestination();
+ startTime = p.getTimestamp();
+ } else if (p.getSessionHash() != sessionID) {
+ logger.warn("Attempting to add session {} to incorrect TCP session.", sessionID);
+ return;
+ }
+
+ // Add packet to appropriate list and increment the data size counter
+ if (p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) {
+ packetsFromSender.add(p);
+ // Increment the data size counters
+ if (p.getData() != null) {
+ sentDataSize += p.getData().length;
+ }
+
+ } else {
+ packetsFromReceiver.add(p);
+ if (p.getData() != null) {
+ receivedDataSize += p.getData().length;
+ }
+ }
+
+ // Check flags if connection is not established
+ if (!handshake.isConnected()) {
+ if (p.getSynFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) {
+ // This is part 1 of the TCP session handshake
+ // The host sends the first SYN packet
+ handshake.setSyn();
+ synTime = p.getTimestamp();
+ } else if (p.getSynFlag() && p.getAckFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(dstIP.getHostAddress())) {
+ // This condition represents the second part of the TCP Handshake,
+ // where the receiver sends a frame with the SYN/ACK flags set to the originator
+ handshake.setAck();
+ } else if (p.getAckFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) {
+ // Finally, this condition represents a successful opening of a TCP session, when the originator sends a frame with only the ACK flag set.
+ // At this point we finalize the session object and clear out the flags.
+ handshake.setAck();
+ ackTime = p.getTimestamp();
+ connectTime = ackTime - synTime;
+ //handshake.setConnected(sessionID);
+ }
+ } else {
+ /* Check for flags to close connection. Closing a TCP session is more difficult than opening a session and there are
+ * a lot of ways that it can go wrong. See https://accedian.com/enterprises/blog/close-tcp-sessions-diagnose-disconnections/ for references on
+ * closing TCP sessions.
+ *
+ * To close a session correctly, there are four steps:
+ * 1. Party A sends a frame to party B with the FIN flag
+ * 2. Party B sends a frame with an ACK flag
+ * 3. Party B then follows with a frame with the FIN flag set
+ * 4. Party A then confirms with an ACK flag.
+ *
+ * Technically, the session is closed upon the first FIN flag and resources can be released at that point. However, a lot can go wrong, so to force the closing of a
+ * session, either party can send a frame with a RST flag set which forces the closing of the session.
+ */
+
+ if (p.getRstFlag()) {
+ // This is the case for a forced closed connection. Session is immediately closed.
+ handshake.setRst();
+ } else if (p.getFinFlag()) {
+ // This is the beginning of the normal session closure procedure. If a FIN flag has been seen, the session is basically closed even if one party continues to send
+ // data,
+ handshake.setFin();
+ } else if (handshake.getCurrentSessionState() == TcpHandshake.State.CLOSE_WAIT) {
+
+ }
+ if (p.getAckFlag() && p.getFinFlag()) {
+ handshake.setAck();
+ }
+ }
+
+ // Augment the packet counter
+ packetCount++;
+
+ if (p.isCorrupt()) {
+ hasCorruptedData = true;
+ }
+
+ // Add the start and ending time stamp. The packets are not necessarily received in order, so we have to check the timestamps this way
+ if (p.getTimestampMicro() < startTime) {
+ startTime = p.getTimestamp();
+ }
+
+ if (p.getTimestampMicro() > endTime) {
+ endTime = p.getTimestamp();
+ }
+
+ // Close the session if the closing handshake is complete
+ if (handshake.isClosed()) {
+ closeSession();
+ }
+ }
+
+ /**
+ * This function returns true if the TCP session has been established, false if not.
+ * @return True if the session has been established, false if not.
+ */
+ public boolean connectionEstablished() {
+ return handshake.isConnected();
+ }
+
+ public boolean connectionClosed() {
+ return handshake.isClosed();
+ }
+
+ public void closeSession() {
+ logger.debug("Closing session {}", sessionID);
+ /* The sent and received bytes cannot be written until the session is closed.
+ * Upon receipt of the FIN->FIN/ACK handshake, write everything.
+ *
+ * Since it cannot be assumed that the packets were received in the correct order, we must:
+ * 1. Sort them by TCP Sequence Number
+ * 2. Write the data to the respective byte array
+ */
+
+ Collections.sort(packetsFromSender);
+ Collections.sort(packetsFromReceiver);
+
+ sentData = new byte[sentDataSize];
+ receivedData = new byte[receivedDataSize];
+
+ byte[] dataFromPacket;
+ int dataOffset = 0;
+ // Now that the lists are sorted, add packet data to the lists
+ for (int i = 0; i < packetsFromSender.size(); i++) {
+ // Get the packet;
+ Packet p = packetsFromSender.get(i);
+ dataFromPacket = p.getData();
+ if (dataFromPacket != null) {
+ for (int j = 0; j < dataFromPacket.length; j++) {
+ sentData[dataOffset] = dataFromPacket[j];
+ dataOffset++;
+ }
+ }
+ }
+
+ dataOffset = 0;
+ for (int i = 0; i < packetsFromReceiver.size(); i++) {
+ // Get the packet;
+ Packet p = packetsFromReceiver.get(i);
+ dataFromPacket = p.getData();
+ if (dataFromPacket != null) {
+ for (int j = 0; j < dataFromPacket.length; j++) {
+ receivedData[dataOffset] = dataFromPacket[j];
+ dataOffset++;
+ }
+ }
+ }
+ }
+
+ public Instant getSessionStartTime() {
+ return new Instant(startTime);
+ }
+
+ public Period getSessionDuration() {
+ return new Period(endTime - startTime);
+ }
+
+ public Period getConnectionTime() {
+ return new Period(connectTime);
+ }
+
+ public Instant getSessionEndTime() {
+ return new Instant(endTime);
+ }
+
+ public String getSrcMac() {
+ return srcMac;
+ }
+
+ public String getDstMac() {
+ return dstMac;
+ }
+
+ public String getSrcIP() {
+ return srcIP.getHostAddress();
+ }
+
+ public String getDstIP() {
+ return dstIP.getHostAddress();
+ }
+
+ public int getSrcPort() {
+ return srcPort;
+ }
+
+ public int getDstPort() {
+ return dstPort;
+ }
+
+ public long getSessionID() {
+ return sessionID;
+ }
+
+ public int getPacketCount() {
+ return packetsFromReceiver.size() + packetsFromSender.size();
+ }
+
+ public int getPacketCountFromOrigin() { return packetsFromSender.size(); }
+
+ public int getPacketCountFromRemote() { return packetsFromReceiver.size(); }
+
+ public boolean hasCorruptedData() {
+ return hasCorruptedData;
+ }
+
+ public int getDataVolumeFromOrigin() {
+ return sentData.length;
+ }
+
+ public int getDataVolumeFromRemote() {
+ return receivedData.length;
+ }
+
+ public byte[] getDataFromOriginator() {
+ return sentData;
+ }
+
+ public String getDataFromOriginatorAsString() {
+ return parseBytesToASCII(sentData);
+ }
+
+ public byte[] getDataFromRemote() {
+ return receivedData;
+ }
+
+ public String getDataFromRemoteAsString() {
+ return parseBytesToASCII(receivedData);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
index fc6e029..c1fb4db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java
@@ -22,5 +22,6 @@ public enum PcapTypes {
INTEGER,
STRING,
LONG,
- TIMESTAMP
-}
\ No newline at end of file
+ TIMESTAMP,
+ DURATION
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
index d88bc97..7655d26 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
@@ -30,8 +30,10 @@ public class Schema {
private final List<ColumnDto> columns = new ArrayList<>();
private final MinorType typeMap[] = new MinorType[PcapTypes.values().length];
+ private final boolean sessionizeTCPStreams;
- public Schema() {
+ public Schema(boolean sessionSchema) {
+ this.sessionizeTCPStreams = sessionSchema;
setupStructure();
}
@@ -41,36 +43,58 @@ public class Schema {
typeMap[PcapTypes.STRING.ordinal()] = MinorType.VARCHAR;
typeMap[PcapTypes.LONG.ordinal()] = MinorType.BIGINT;
typeMap[PcapTypes.TIMESTAMP.ordinal()] = MinorType.TIMESTAMP;
+ typeMap[PcapTypes.DURATION.ordinal()] = MinorType.INTERVAL;
- columns.add(new ColumnDto("type", PcapTypes.STRING));
- columns.add(new ColumnDto("network", PcapTypes.INTEGER));
- columns.add(new ColumnDto("packet_timestamp", PcapTypes.TIMESTAMP));
- columns.add(new ColumnDto("timestamp_micro", PcapTypes.LONG));
+ // Common columns
columns.add(new ColumnDto("src_ip", PcapTypes.STRING));
columns.add(new ColumnDto("dst_ip", PcapTypes.STRING));
columns.add(new ColumnDto("src_port", PcapTypes.INTEGER));
columns.add(new ColumnDto("dst_port", PcapTypes.INTEGER));
columns.add(new ColumnDto("src_mac_address", PcapTypes.STRING));
columns.add(new ColumnDto("dst_mac_address", PcapTypes.STRING));
- columns.add(new ColumnDto("tcp_session", PcapTypes.LONG));
- columns.add(new ColumnDto("tcp_sequence", PcapTypes.INTEGER));
- columns.add(new ColumnDto("tcp_ack", PcapTypes.BOOLEAN));
- columns.add(new ColumnDto("tcp_flags", PcapTypes.INTEGER));
- columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.BOOLEAN));
- columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.BOOLEAN));
- columns.add(new ColumnDto("tcp_flags_ece", PcapTypes.BOOLEAN ));
- columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.BOOLEAN ));
- columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", PcapTypes.BOOLEAN ));
- columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.BOOLEAN ));
- columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.BOOLEAN ));
- columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.BOOLEAN ));
- columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.BOOLEAN ));
- columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.BOOLEAN ));
- columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.BOOLEAN ));
- columns.add(new ColumnDto("tcp_parsed_flags", PcapTypes.STRING));
- columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER));
- columns.add(new ColumnDto("is_corrupt", PcapTypes.BOOLEAN));
- columns.add(new ColumnDto("data", PcapTypes.STRING));
+
+ // Columns specific for Sessionized TCP Sessions
+ if (sessionizeTCPStreams) {
+ columns.add(new ColumnDto("session_start_time", PcapTypes.TIMESTAMP));
+ columns.add(new ColumnDto("session_end_time", PcapTypes.TIMESTAMP));
+ columns.add(new ColumnDto("session_duration", PcapTypes.DURATION));
+ columns.add(new ColumnDto("total_packet_count", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("data_volume_from_origin", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("data_volume_from_remote", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("packet_count_from_origin", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("packet_count_from_remote", PcapTypes.INTEGER));
+
+ columns.add(new ColumnDto("connection_time", PcapTypes.DURATION));
+ columns.add(new ColumnDto("tcp_session", PcapTypes.LONG));
+ columns.add(new ColumnDto("is_corrupt", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("data_from_originator", PcapTypes.STRING));
+ columns.add(new ColumnDto("data_from_remote", PcapTypes.STRING));
+ } else {
+ // Columns for Regular Packets
+ columns.add(new ColumnDto("type", PcapTypes.STRING));
+ columns.add(new ColumnDto("network", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("packet_timestamp", PcapTypes.TIMESTAMP));
+ columns.add(new ColumnDto("timestamp_micro", PcapTypes.LONG));
+ columns.add(new ColumnDto("tcp_session", PcapTypes.LONG));
+ columns.add(new ColumnDto("tcp_sequence", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("tcp_ack", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_flags", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_flags_ece", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("tcp_parsed_flags", PcapTypes.STRING));
+ columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER));
+ columns.add(new ColumnDto("is_corrupt", PcapTypes.BOOLEAN));
+ columns.add(new ColumnDto("data", PcapTypes.STRING));
+ }
}
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index f43afb7..7721dc7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -58,11 +58,13 @@ public class TestFormatPluginOptionExtractor {
break;
case "json":
case "sequencefile":
- case "pcap":
case "pcapng":
case "avro":
assertEquals(d.typeName, "(type: String)", d.presentParams());
break;
+ case "pcap":
+ assertEquals(d.typeName, "(type: String, sessionizeTCPStreams: boolean)", d.presentParams());
+ break;
case "httpd":
assertEquals("(type: String, logFormat: String, timestampFormat: String)", d.presentParams());
break;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
index f0e89d9..54021d2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
@@ -22,6 +22,8 @@ import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.exec.store.pcap.decoder.Packet;
import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataOutputStream;
@@ -35,7 +37,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestPcapDecoder extends BaseTestQuery {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPcapDecoder.class);
+ private static final Logger logger = LoggerFactory.getLogger(TestPcapDecoder.class);
private static File bigFile;
@@ -61,7 +63,7 @@ public class TestPcapDecoder extends BaseTestQuery {
int offset = 0;
- byte[] buffer = new byte[PcapRecordReader.BUFFER_SIZE + pd.getMaxLength()];
+ byte[] buffer = new byte[PcapBatchReader.BUFFER_SIZE + pd.getMaxLength()];
int validBytes = in.read(buffer);
assertTrue(validBytes > 50);
@@ -168,7 +170,7 @@ public class TestPcapDecoder extends BaseTestQuery {
PacketDecoder pd = new PacketDecoder(in);
Packet p = pd.packet();
- byte[] buffer = new byte[PcapRecordReader.BUFFER_SIZE + pd.getMaxLength()];
+ byte[] buffer = new byte[PcapBatchReader.BUFFER_SIZE + pd.getMaxLength()];
int validBytes = in.read(buffer);
int offset = 0;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
index 246691e..5b07423 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java
@@ -29,7 +29,6 @@ import org.apache.drill.test.ClusterTest;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
import java.time.LocalDateTime;
import java.time.Month;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
new file mode 100644
index 0000000..8c2818d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.pcap;
+
+
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.joda.time.Period;
+import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import java.time.format.DateTimeFormatter;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSessionizePCAP extends ClusterTest {
+
+ private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ PcapFormatConfig sampleConfig = new PcapFormatConfig();
+ sampleConfig.sessionizeTCPStreams = true;
+
+ cluster.defineFormat("cp", "pcap", sampleConfig);
+ dirTestWatcher.copyResourceToRoot(Paths.get("store/pcap/"));
+ }
+
+ @Test
+ public void testSessionizedStarQuery() throws Exception {
+ String sql = "SELECT * FROM cp.`/store/pcap/attack-trace.pcap` WHERE src_port=1821 AND dst_port=445";
+
+ testBuilder()
+ .sqlQuery(sql)
+ .ordered()
+ .baselineColumns("session_start_time", "session_end_time", "session_duration", "total_packet_count", "connection_time", "src_ip", "dst_ip", "src_port", "dst_port",
+ "src_mac_address", "dst_mac_address", "tcp_session", "is_corrupt", "data_from_originator", "data_from_remote", "data_volume_from_origin",
+ "data_volume_from_remote", "packet_count_from_origin", "packet_count_from_remote")
+ .baselineValues(LocalDateTime.parse("2009-04-20T03:28:28.374", formatter),
+ LocalDateTime.parse("2009-04-20T03:28:28.508", formatter),
+ Period.parse("PT0.134S"), 4,
+ Period.parse("PT0.119S"),
+ "98.114.205.102",
+ "192.150.11.111",
+ 1821, 445,
+ "00:08:E2:3B:56:01",
+ "00:30:48:62:4E:4A",
+ -8791568836279708938L,
+ false,
+ "........I....>...>..........Ib...<...<..........I....>...>", "", 62,0, 3, 1)
+ .go();
+ }
+
+ @Test
+ public void testSessionizedSpecificQuery() throws Exception {
+ String sql = "SELECT session_start_time, session_end_time,session_duration, total_packet_count, connection_time, src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, " +
+ "is_corrupt, data_from_originator, data_from_remote, data_volume_from_origin, data_volume_from_remote, packet_count_from_origin, packet_count_from_remote " +
+ "FROM cp.`/store/pcap/attack-trace.pcap` WHERE src_port=1821 AND dst_port=445";
+
+ testBuilder()
+ .sqlQuery(sql)
+ .ordered()
+ .baselineColumns("session_start_time", "session_end_time", "session_duration", "total_packet_count", "connection_time", "src_ip", "dst_ip", "src_port", "dst_port",
+ "src_mac_address", "dst_mac_address", "tcp_session", "is_corrupt", "data_from_originator", "data_from_remote", "data_volume_from_origin",
+ "data_volume_from_remote", "packet_count_from_origin", "packet_count_from_remote")
+ .baselineValues(LocalDateTime.parse("2009-04-20T03:28:28.374", formatter),
+ LocalDateTime.parse("2009-04-20T03:28:28.508", formatter),
+ Period.parse("PT0.134S"), 4,
+ Period.parse("PT0.119S"),
+ "98.114.205.102",
+ "192.150.11.111",
+ 1821, 445,
+ "00:08:E2:3B:56:01",
+ "00:30:48:62:4E:4A",
+ -8791568836279708938L,
+ false,
+ "........I....>...>..........Ib...<...<..........I....>...>", "", 62,0, 3, 1)
+ .go();
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ String sql = "SELECT COUNT(*) FROM cp.`/store/pcap/attack-trace.pcap`";
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+ assertEquals("Counts should match", 5L, cnt);
+ }
+}
diff --git a/exec/java-exec/src/test/resources/store/pcap/attack-trace.pcap b/exec/java-exec/src/test/resources/store/pcap/attack-trace.pcap
new file mode 100644
index 0000000..68e1fff
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcap/attack-trace.pcap differ