You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/18 13:08:56 UTC

[GitHub] Vlad-Storona commented on a change in pull request #1126: DRILL-6179: Added pcapng-format support

Vlad-Storona commented on a change in pull request #1126: DRILL-6179: Added pcapng-format support
URL: https://github.com/apache/drill/pull/1126#discussion_r203371117
 
 

 ##########
 File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
 ##########
 @@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import com.google.common.collect.ImmutableList;
+import fr.bmartel.pcapdecoder.PcapDecoder;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.pcapng.schema.Schema;
+import org.apache.drill.exec.store.pcapng.schema.columns.Column;
+import org.apache.drill.exec.store.pcapng.schema.columns.DummyImpl;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+public class PcapngRecordReader extends AbstractRecordReader {
+  private static final Logger logger = LoggerFactory.getLogger(PcapngRecordReader.class);
+
+  // batch size should not exceed max allowed record count
+  private static final int BATCH_SIZE = 40_000;
+
+  private final Path pathToFile;
+  private OutputMutator output;
+  private ImmutableList<ProjectedColumnInfo> projectedCols;
+  private FileSystem fs;
+  private FSDataInputStream in;
+  private List<SchemaPath> columns;
+
+  private Iterator<IPcapngType> it;
+
+  public PcapngRecordReader(final String pathToFile,
+                            final FileSystem fileSystem,
+                            final List<SchemaPath> columns) {
+    this.fs = fileSystem;
+    this.pathToFile = fs.makeQualified(new Path(pathToFile));
+    this.columns = columns;
+    setColumns(columns);
+  }
+
+  @Override
+  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+    try {
+
+      this.output = output;
+      this.in = fs.open(pathToFile);
+      PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
+      decoder.decode();
+      this.it = decoder.getSectionList().iterator();
+      setupProjection();
+    } catch (IOException io) {
+      throw UserException.dataReadError(io)
+          .addContext("File name:", pathToFile.toUri().getPath())
+          .build(logger);
+    }
+  }
+
+  @Override
+  public int next() {
+    if (isSkipQuery()) {
+      return getBatchOfBlocks().size();
+    } else {
+      return putToTable(getBatchOfBlocks());
+    }
+  }
+
+  private int putToTable(final List<IEnhancedPacketBLock> batchOfBlocks) {
+    int counter = 0;
+    for (IEnhancedPacketBLock bLock : batchOfBlocks) {
+      for (ProjectedColumnInfo pci : projectedCols) {
+        pci.getColumn().process(bLock, pci.getVv(), counter);
+      }
+      counter++;
+    }
+    return counter;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (in != null) {
+      in.close();
+    }
+  }
+
+  private void setupProjection() {
+    if (isSkipQuery()) {
+      projectedCols = projectNone();
+    } else if (isStarQuery()) {
+      projectedCols = projectAllCols(Schema.getColumnsNames());
+    } else {
+      projectedCols = projectCols(columns);
+    }
+  }
+
+  private ImmutableList<ProjectedColumnInfo> projectNone() {
+    ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
+    pciBuilder.add(makeColumn("dummy", new DummyImpl()));
+    return pciBuilder.build();
+  }
+
+  private ImmutableList<ProjectedColumnInfo> projectAllCols(final Set<String> columns) {
+    ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
+    for (String colName : columns) {
+      pciBuilder.add(makeColumn(colName, Schema.getColumns().get(colName)));
+    }
+    return pciBuilder.build();
+  }
+
+  private ImmutableList<ProjectedColumnInfo> projectCols(final List<SchemaPath> columns) {
+    ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
+    for (SchemaPath schemaPath : columns) {
+      String projectedName = schemaPath.rootName();
+      if (Schema.getColumns().containsKey(projectedName.toLowerCase())) {
+        pciBuilder.add(makeColumn(projectedName,
+            Schema.getColumns().get(projectedName.toLowerCase())));
+      }
+    }
+    return pciBuilder.build();
+  }
+
+  private ProjectedColumnInfo makeColumn(final String colName, final Column column) {
+    TypeProtos.MajorType majorType = Types.optional(column.getMinorType());
+
+    MaterializedField field =
+        MaterializedField.create(colName, majorType);
+
+    ValueVector vector =
+        getValueVector(column.getMinorType(), majorType, field, output);
+    return new ProjectedColumnInfo(vector, column, colName);
+  }
+
+  private ValueVector getValueVector(final TypeProtos.MinorType minorType,
+                                     final TypeProtos.MajorType majorType,
+                                     final MaterializedField field, final OutputMutator output) {
 
 Review comment:
   Thanks, I will simplify this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services