You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/03/25 21:13:51 UTC

[GitHub] [drill] paul-rogers commented on a change in pull request #2192: DRILL-7828: Refactor Pcap and Pcapng format plugin

paul-rogers commented on a change in pull request #2192:
URL: https://github.com/apache/drill/pull/2192#discussion_r601836846



##########
File path: contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/plugin/PcapFormatConfig.java
##########
@@ -29,18 +29,23 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
-@JsonTypeName(PcapngFormatConfig.NAME)
+@JsonTypeName(PcapFormatConfig.NAME)
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-public class PcapngFormatConfig implements FormatPluginConfig {
+public class PcapFormatConfig implements FormatPluginConfig {
+  private static final List<String> DEFAULT_EXTNS = ImmutableList.of("pcap", "pcapng");
 
-  public static final String NAME = "pcapng";
+  public static final String NAME = "pcap";
   private final List<String> extensions;
   private final boolean stat;
+  private final boolean sessionizeTCPStreams;
 
   @JsonCreator
-  public PcapngFormatConfig(@JsonProperty("extensions") List<String> extensions, @JsonProperty("stat") boolean stat) {
-    this.extensions = extensions == null ? ImmutableList.of(PcapngFormatConfig.NAME) : ImmutableList.copyOf(extensions);
+  public PcapFormatConfig(@JsonProperty("extensions") List<String> extensions,
+                          @JsonProperty("stat") boolean stat,
+                          @JsonProperty("sessionizeTCPStreams") Boolean sessionizeTCPStreams) {

Review comment:
       People have to type this name. Brevity is good. Maybe just `sessionize` as the property name. Because this is PCAP, the only thing that *can* be sessionized are TCP streams.

##########
File path: contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/plugin/PcapFormatConfig.java
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.pcapng;
+package org.apache.drill.exec.store.plugin;

Review comment:
       Seems odd for Pcap to be renamed to `plugin`. I would have thought that `plugin` would be more generic rather than one specific plugin. Suggestion: `pcap` as package name.

##########
File path: contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/plugin/PcapFormatConfig.java
##########
@@ -29,18 +29,23 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
-@JsonTypeName(PcapngFormatConfig.NAME)
+@JsonTypeName(PcapFormatConfig.NAME)
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-public class PcapngFormatConfig implements FormatPluginConfig {
+public class PcapFormatConfig implements FormatPluginConfig {
+  private static final List<String> DEFAULT_EXTNS = ImmutableList.of("pcap", "pcapng");
 
-  public static final String NAME = "pcapng";
+  public static final String NAME = "pcap";

Review comment:
       Note that changing this name may make existing systems unrecoverable. Drill cannot handle the case in which a plugin config stored in ZK changes names or structure (other than adding a field.) This change will requires all uses of PCAP to delete all plugins in ZK and start over. Is that the intent? Or, will there be code to read the old format and rename it to the new format (which must be done prior to Jackson deserialization)?
   
   Is there a unit test to check this? Creating one is a bit tricky: you have to simulate the old format in ZK, then read it with the new code.

##########
File path: contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json
##########
@@ -3,26 +3,29 @@
     "dfs": {
       "type": "file",
       "formats": {
-        "pcapng": {
-          "type": "pcapng",
+        "pcap": {
+          "type": "pcap",

Review comment:
       Breaking change; see above.

##########
File path: contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
##########
@@ -48,126 +49,61 @@
   private static final Logger logger = LoggerFactory.getLogger(PcapBatchReader.class);
 
   private FileSplit split;
-
   private PacketDecoder decoder;
-
   private InputStream fsStream;
-
   private RowSetLoader rowWriter;
-
   private int validBytes;
-
   private byte[] buffer;
-
   private int offset;
-
   private ScalarWriter typeWriter;
-
   private ScalarWriter timestampWriter;
-
   private ScalarWriter timestampMicroWriter;
-
   private ScalarWriter networkWriter;
-
   private ScalarWriter srcMacAddressWriter;
-
   private ScalarWriter dstMacAddressWriter;
-
   private ScalarWriter dstIPWriter;
-
   private ScalarWriter srcIPWriter;
-
   private ScalarWriter srcPortWriter;
-
   private ScalarWriter dstPortWriter;
-
   private ScalarWriter packetLengthWriter;
-
   private ScalarWriter tcpSessionWriter;
-
   private ScalarWriter tcpSequenceWriter;
-
   private ScalarWriter tcpAckNumberWriter;
-
   private ScalarWriter tcpFlagsWriter;
-
   private ScalarWriter tcpParsedFlagsWriter;
-
   private ScalarWriter tcpNsWriter;
-
   private ScalarWriter tcpCwrWriter;
-
   private ScalarWriter tcpEceWriter;
-
   private ScalarWriter tcpFlagsEceEcnCapableWriter;
-
   private ScalarWriter tcpFlagsCongestionWriter;
-
   private ScalarWriter tcpUrgWriter;
-
   private ScalarWriter tcpAckWriter;
-
   private ScalarWriter tcpPshWriter;
-
   private ScalarWriter tcpRstWriter;
-
   private ScalarWriter tcpSynWriter;
-
   private ScalarWriter tcpFinWriter;
-
   private ScalarWriter dataWriter;
-
   private ScalarWriter isCorruptWriter;
-
-  private final PcapReaderConfig readerConfig;
-
-
+  private final PcapFormatConfig readerConfig;
   // Writers for TCP Sessions
   private ScalarWriter sessionStartTimeWriter;
-
   private ScalarWriter sessionEndTimeWriter;
-
   private ScalarWriter sessionDurationWriter;
-
   private ScalarWriter connectionTimeWriter;
-
   private ScalarWriter packetCountWriter;
-
   private ScalarWriter originPacketCounterWriter;
-
   private ScalarWriter remotePacketCounterWriter;
-
   private ScalarWriter originDataVolumeWriter;
-
   private ScalarWriter remoteDataVolumeWriter;
-
   private ScalarWriter hostDataWriter;
-
   private ScalarWriter remoteDataWriter;
-
   private final int maxRecords;
-
   private Map<Long, TcpSession> sessionQueue;
 
 
-  public static class PcapReaderConfig {
-
-    protected final PcapFormatPlugin plugin;
-
-    public boolean sessionizeTCPStreams;
-
-    private final PcapFormatConfig config;
-
-    public PcapReaderConfig(PcapFormatPlugin plugin) {
-      this.plugin = plugin;
-      this.config = plugin.getConfig();
-      this.sessionizeTCPStreams = config.getSessionizeTCPStreams();
-    }
-  }
-
-  public PcapBatchReader(PcapReaderConfig readerConfig, int maxRecords) {
+  public PcapBatchReader(PcapFormatConfig readerConfig, int maxRecords) {
     this.readerConfig = readerConfig;
-    if (readerConfig.sessionizeTCPStreams) {
+    if (readerConfig.getSessionizeTCPStreams()) {

Review comment:
       Nit: no real need for a getter. The semantics of pugins require that the fields be public. A getter is needed only if we want to do something to that field.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
##########
@@ -103,7 +103,7 @@ protected void configure() {
   }
 
   @Override
-  protected SchemaNegotiatorImpl newNegotiator() {
+  public SchemaNegotiatorImpl newNegotiator() {

Review comment:
       Why is this now `public`? The `columns` framework was for CSV files using the `columns` column; this is an internal bit of structure not meant to be overridden.
   
   Also, would be better to invest time with the "new" version rather than this somewhat cumbersome older version.

##########
File path: contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/plugin/PcapFormatPlugin.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plugin;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.store.pcap.PcapBatchReader;
+import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
+import org.apache.drill.exec.store.pcapng.PcapngBatchReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.regex.Pattern;
+
+public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> {
+
+  static final Logger logger = LoggerFactory.getLogger(ManagedScanFramework.class);
+  private static PacketDecoder.FileFormat fileFormat = PacketDecoder.FileFormat.UNKNOWN;
+
+  public PcapFormatPlugin(String name,
+                          DrillbitContext context,
+                          Configuration fsConf,
+                          StoragePluginConfig storageConfig,
+                          PcapFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+  }
+
+  private static EasyFormatConfig easyConfig(Configuration fsConf, PcapFormatConfig pluginConfig) {
+    return EasyFormatConfig.builder()
+        .readable(true)
+        .writable(false)
+        .blockSplittable(false)
+        .compressible(true)
+        .extensions(pluginConfig.getExtensions())
+        .fsConf(fsConf)
+        .useEnhancedScan(true)
+        .supportsLimitPushdown(true)
+        .supportsProjectPushdown(true)
+        .defaultName(PcapFormatConfig.NAME)
+        .build();
+  }
+
+  private static class PcapReaderFactory extends FileReaderFactory {
+
+    private final PcapFormatConfig config;
+    private final EasySubScan scan;
+
+    public PcapReaderFactory(PcapFormatConfig config, EasySubScan scan) {
+      this.config = config;
+      this.scan = scan;
+    }
+
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      fileFormat = fromMagicNumber(fileFramework);
+      return createReader(scan, config);
+    }
+  }
+
+  @Override
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) {
+    return createReader(scan, formatConfig);
+  }
+
+  private static ManagedReader<? extends FileSchemaNegotiator> createReader(EasySubScan scan, PcapFormatConfig config) {
+    switch(fileFormat) {
+      case PCAPNG: return new PcapngBatchReader(config, scan);
+      case PCAP:
+      case UNKNOWN:
+      default: return new PcapBatchReader(config, scan.getMaxRecords());
+    }
+  }
+
+  @Override
+  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+    FileScanBuilder builder = new FileScanBuilder();
+    builder.setReaderFactory(new PcapReaderFactory(formatConfig, scan));
+
+    initScanBuilder(builder, scan);
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+    return builder;
+  }
+
+  /**
+   * Helper method to detect PCAP or PCAPNG file format based on file Magic Number
+   *
+   * @param fileFramework for obtaining InputStream
+   * @return PCAP/PCAPNG file format
+   */
+  private static PacketDecoder.FileFormat fromMagicNumber(FileScanFramework fileFramework) {
+    FileScanFramework.FileSchemaNegotiatorImpl negotiator = (FileScanFramework.FileSchemaNegotiatorImpl) fileFramework.newNegotiator();
+    DrillFileSystem dfs = negotiator.fileSystem();
+    Path path = dfs.makeQualified(negotiator.split().getPath());
+    try (InputStream inputStream = dfs.openPossiblyCompressedStream(path)) {
+      PacketDecoder decoder = new PacketDecoder(inputStream);
+      return decoder.getFileFormat();
+    } catch (IOException io) {
+      throw UserException
+              .dataReadError(io)
+              .addContext("File name:", path.toString())
+              .build(logger);
+    }
+  }
+
+  /**
+   * Detects PCAP or PCAPNG file format based on file extension. Not used now due to {@link #fromMagicNumber} usage for
+   * this purpose

Review comment:
       Drill conventions have been to remove unused code.

##########
File path: contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/plugin/PcapFormatPlugin.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plugin;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;

Review comment:
       At some point it would be good to start using the "new" version of this stuff which is considerably simpler, and more forward looking, than this original version. For example, the "new" version provides better options for working with provided schemas.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org