You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by cgivre <gi...@git.apache.org> on 2018/02/06 13:39:57 UTC

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

GitHub user cgivre opened a pull request:

    https://github.com/apache/drill/pull/1114

    Drill-6104: Added Logfile Reader

    I would like to submit a format plugin that will enable Drill to read log files.   Here is a link to the github repo which contains documentation: 
    https://github.com/cgivre/drill-logfile-plugin
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cgivre/drill format-logfile

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1114.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1114
    
----
commit 20b9f185ae0f3d5e600813668345430c50984b0c
Author: cgivre <cg...@...>
Date:   2018-02-06T13:34:45Z

    Added Logfile Reader

----


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167131092
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    --- End diff --
    
    No need to include the `0` argument. It is for filling in patterns as shown above. Here and below.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167129205
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    --- End diff --
    
    Probably don't want to do this here. The `RecordReader` protocol is kind of awkward. Suppose you scan 1000 log files in a single thread. The Scan operator will create 1000 `RecordReader` instances, then process them one by one. We really don't want to open 1000 files at the same time. So, in the constructor, just get ready, but defer file opening until the `setup()` call.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167130581
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    --- End diff --
    
    `i = 0` should work, `-0` isn't necessary.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by cgivre <gi...@git.apache.org>.
Github user cgivre commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r170675258
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.logical.FormatPluginConfig;
    +import org.apache.drill.common.logical.StoragePluginConfig;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.proto.UserBitShared;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.RecordWriter;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
    +import org.apache.drill.exec.store.dfs.easy.EasyWriter;
    +import org.apache.drill.exec.store.dfs.easy.FileWork;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +public class LogFormatPlugin extends EasyFormatPlugin<LogFormatPlugin.LogFormatConfig> {
    +
    +  private static final boolean IS_COMPRESSIBLE = true;
    +  private static final String DEFAULT_NAME = "log";
    +  private LogFormatConfig config;
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
    +    this(name, context, fsConf, storageConfig, new LogFormatConfig());
    +  }
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, LogFormatConfig formatPluginConfig) {
    +    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
    +    this.config = formatPluginConfig;
    +  }
    +
    +  @Override
    +  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
    +                                      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
    +    return new LogRecordReader(context, fileWork.getPath(), dfs, columns, config);
    +  }
    +
    +  @Override
    +  public int getReaderOperatorType() {
    +    return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
    +  }
    +
    +  @Override
    +  public int getWriterOperatorType() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean supportsPushDown() {
    +    return true;
    +  }
    +
    +  @Override
    +  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
    +    return null;
    +  }
    +
    +  @JsonTypeName("log")
    +  public static class LogFormatConfig implements FormatPluginConfig {
    +    public List<String> extensions;
    +    public List<String> fieldNames;
    +    public List<String> dataTypes;
    +    public String dateFormat = "";
    +    public String timeFormat = "HH:mm:ss";
    +    public String pattern;
    +    public Boolean errorOnMismatch = false;
    +
    +    private static final List<String> DEFAULT_EXTS = ImmutableList.of("log");
    +
    +    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
    +    public List<String> getExtensions() {
    +      if (extensions == null) {
    +        return DEFAULT_EXTS;
    +      }
    +      return extensions;
    +    }
    +
    +    public List<String> getFieldNames() {
    +      return fieldNames;
    +    }
    +
    +    public List<String> getDataTypes() {
    +      return dataTypes;
    +    }
    +
    +    public String getDateFormat() {
    +      return dateFormat;
    +    }
    +
    +    public String getTimeFormat() {
    +
    +      return timeFormat;
    +    }
    +
    +    public String getPattern() {
    +      return pattern;
    +    }
    +
    +    public Boolean getErrorOnMismatch() {
    +      return errorOnMismatch;
    +    }
    +
    +    @Override
    +    public int hashCode() {
    +      int result = pattern != null ? pattern.hashCode() : 0;
    +      result = 31 * result + (dateFormat != null ? dateFormat.hashCode() : 0) + (timeFormat != null ? timeFormat.hashCode() : 0);
    +      return result;
    +    }
    +
    +    @Override
    +    public boolean equals(Object obj) {
    +      if (this == obj) {
    +        return true;
    +      } else if (obj == null) {
    +        return false;
    +      } else if (getClass() == obj.getClass()) {
    --- End diff --
    
    I'm honestly not really sure what this code does, but I fixed it per your recommendation.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167131126
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    --- End diff --
    
    As above.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by cgivre <gi...@git.apache.org>.
Github user cgivre commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r170674675
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    +      if (timeFormat != null && !timeFormat.isEmpty()) {
    +        tf = new java.text.SimpleDateFormat(timeFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid time format.  The time formatting string was empty.  Please specify a valid time format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +  }
    +
    +  public int next() {
    +    this.writer.allocate();
    +    this.writer.reset();
    +
    +    int recordCount = 0;
    +
    +    try {
    +      BaseWriter.MapWriter map = this.writer.rootAsMap();
    +      String line = null;
    +
    +      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
    +        lineCount++;
    +
    +        // Skip empty lines
    +        if (line.trim().length() == 0) {
    +          continue;
    +        }
    +
    +        this.writer.setPosition(recordCount);
    +        map.start();
    --- End diff --
    
    A question here... There is a boolean config variable called `errorOnMismatch`.  If set to false, the mismatched rows are added to a column called `unmatched_lines` and if true, it throws an exception.  That being the case, should I still move this?  


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167132395
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    +      if (timeFormat != null && !timeFormat.isEmpty()) {
    +        tf = new java.text.SimpleDateFormat(timeFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid time format.  The time formatting string was empty.  Please specify a valid time format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +  }
    +
    +  public int next() {
    +    this.writer.allocate();
    +    this.writer.reset();
    +
    +    int recordCount = 0;
    +
    +    try {
    +      BaseWriter.MapWriter map = this.writer.rootAsMap();
    +      String line = null;
    +
    +      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
    +        lineCount++;
    +
    +        // Skip empty lines
    +        if (line.trim().length() == 0) {
    +          continue;
    +        }
    +
    +        this.writer.setPosition(recordCount);
    +        map.start();
    +
    +        Matcher m = r.matcher(line);
    +        if (m.find()) {
    +          for (int i = 1; i <= m.groupCount(); i++) {
    +
    +            String fieldName = fieldNames.get(i - 1);
    +            String type = dataTypes.get(i - 1);
    +            String fieldValue;
    +
    +            fieldValue = m.group(i);
    +
    +            if (fieldValue == null) {
    +              fieldValue = "";
    +            }
    +
    +            if (type.toUpperCase().equals("INT") || type.toUpperCase().equals("INTEGER")) {
    +              map.integer(fieldName).writeInt(Integer.parseInt(fieldValue));
    +            } else if (type.toUpperCase().equals("DOUBLE") || type.toUpperCase().equals("FLOAT8")) {
    +              map.float8(fieldName).writeFloat8(Double.parseDouble(fieldValue));
    +            } else if (type.toUpperCase().equals("FLOAT") || type.toUpperCase().equals("FLOAT4")) {
    +              map.float4(fieldName).writeFloat4(Float.parseFloat(fieldValue));
    +            } else if (type.toUpperCase().equals("DATE")) {
    +              try {
    +                java.util.Date d = df.parse(fieldValue);
    +                long milliseconds = d.getTime();
    +                map.date(fieldName).writeDate(milliseconds);
    +              } catch (ParseException e) {
    +                if (errorOnMismatch) {
    +                  throw new ParseException(
    +                      "Date Format String " + dateFormat + " does not match date string " + fieldValue + " on line " + lineCount + ".", 0
    +                  );
    +                }
    +              }
    +            } else if (type.toUpperCase().equals("TIMESTAMP")) {
    +              try {
    +                java.util.Date d = df.parse(fieldValue);
    +                long milliseconds = d.getTime();
    +                map.timeStamp(fieldName).writeTimeStamp(milliseconds);
    +              } catch (ParseException e) {
    +                if (errorOnMismatch) {
    +                  throw new ParseException(
    +                      "Date Format String " + dateFormat + " does not match date string " + fieldValue + " on line " + lineCount + ".", 0
    +                  );
    +                }
    +              }
    +            } else if (type.toUpperCase().equals("TIME")) {
    +              java.util.Date t = tf.parse(fieldValue);
    +
    +              int milliseconds = (int) ((t.getHours() * 3600000) +
    +                  (t.getMinutes() * 60000) +
    +                  (t.getSeconds() * 1000));
    +
    +              map.time(fieldName).writeTime(milliseconds);
    +            } else {
    +              byte[] bytes = fieldValue.getBytes("UTF-8");
    +              int stringLength = bytes.length;
    +              this.buffer.setBytes(0, bytes, 0, stringLength);
    +              map.varChar(fieldName).writeVarChar(0, stringLength, buffer);
    +            }
    +          }
    +        } else {
    +          if (errorOnMismatch) {
    +            throw new ParseException("Line does not match pattern: " + inputPath + "\n" + lineCount + ":\n" + line, 0);
    +          } else {
    +            String fieldName = "unmatched_lines";
    +            byte[] bytes = line.getBytes("UTF-8");
    +            this.buffer.setBytes(0, bytes, 0, bytes.length);
    +            map.varChar(fieldName).writeVarChar(0, bytes.length, buffer);
    +          }
    +        }
    +
    +        map.end();
    +        recordCount++;
    +      }
    +
    +      this.writer.setValueCount(recordCount);
    +      return recordCount;
    +
    +    } catch (final Exception e) {
    +      throw UserException.dataReadError(e).build(logger);
    +    }
    +  }
    +
    +  public void close() throws Exception {
    +    this.reader.close();
    +  }
    +}
    --- End diff --
    
    No unit tests?


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by cgivre <gi...@git.apache.org>.
Github user cgivre commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r170675353
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    --- End diff --
    
    Fixed


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167131367
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    +      if (timeFormat != null && !timeFormat.isEmpty()) {
    +        tf = new java.text.SimpleDateFormat(timeFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid time format.  The time formatting string was empty.  Please specify a valid time format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +  }
    +
    +  public int next() {
    +    this.writer.allocate();
    +    this.writer.reset();
    +
    +    int recordCount = 0;
    +
    +    try {
    +      BaseWriter.MapWriter map = this.writer.rootAsMap();
    +      String line = null;
    +
    +      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
    +        lineCount++;
    +
    +        // Skip empty lines
    +        if (line.trim().length() == 0) {
    --- End diff --
    
    Better:
    
    ```
    line = line.trim();
    if (line.isEmpty) { ...
    ```
    
    That way, the matcher below gets the benefit of the trimmed whitespace.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167131250
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.logical.FormatPluginConfig;
    +import org.apache.drill.common.logical.StoragePluginConfig;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.proto.UserBitShared;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.RecordWriter;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
    +import org.apache.drill.exec.store.dfs.easy.EasyWriter;
    +import org.apache.drill.exec.store.dfs.easy.FileWork;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +public class LogFormatPlugin extends EasyFormatPlugin<LogFormatPlugin.LogFormatConfig> {
    +
    +  private static final boolean IS_COMPRESSIBLE = true;
    +  private static final String DEFAULT_NAME = "log";
    +  private LogFormatConfig config;
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
    +    this(name, context, fsConf, storageConfig, new LogFormatConfig());
    +  }
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, LogFormatConfig formatPluginConfig) {
    +    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
    +    this.config = formatPluginConfig;
    +  }
    +
    +  @Override
    +  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
    +                                      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
    +    return new LogRecordReader(context, fileWork.getPath(), dfs, columns, config);
    +  }
    +
    +  @Override
    +  public int getReaderOperatorType() {
    +    return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
    +  }
    +
    +  @Override
    +  public int getWriterOperatorType() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean supportsPushDown() {
    +    return true;
    --- End diff --
    
    This claims that this plugin handles (projection) push-down, but no such code exists in the implementation.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167131536
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    +      if (timeFormat != null && !timeFormat.isEmpty()) {
    +        tf = new java.text.SimpleDateFormat(timeFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid time format.  The time formatting string was empty.  Please specify a valid time format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +  }
    +
    +  public int next() {
    +    this.writer.allocate();
    +    this.writer.reset();
    +
    +    int recordCount = 0;
    +
    +    try {
    +      BaseWriter.MapWriter map = this.writer.rootAsMap();
    +      String line = null;
    +
    +      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
    +        lineCount++;
    +
    +        // Skip empty lines
    +        if (line.trim().length() == 0) {
    +          continue;
    +        }
    +
    +        this.writer.setPosition(recordCount);
    +        map.start();
    +
    +        Matcher m = r.matcher(line);
    +        if (m.find()) {
    +          for (int i = 1; i <= m.groupCount(); i++) {
    --- End diff --
    
    Again, the computer will handle this just fine if we move the row-processing code into another method. But, it will reduce the nesting levels us poor reviewers have to track.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167132068
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    +      if (timeFormat != null && !timeFormat.isEmpty()) {
    +        tf = new java.text.SimpleDateFormat(timeFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid time format.  The time formatting string was empty.  Please specify a valid time format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +  }
    +
    +  public int next() {
    +    this.writer.allocate();
    +    this.writer.reset();
    +
    +    int recordCount = 0;
    +
    +    try {
    +      BaseWriter.MapWriter map = this.writer.rootAsMap();
    +      String line = null;
    +
    +      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
    +        lineCount++;
    +
    +        // Skip empty lines
    +        if (line.trim().length() == 0) {
    +          continue;
    +        }
    +
    +        this.writer.setPosition(recordCount);
    +        map.start();
    +
    +        Matcher m = r.matcher(line);
    +        if (m.find()) {
    +          for (int i = 1; i <= m.groupCount(); i++) {
    +
    +            String fieldName = fieldNames.get(i - 1);
    +            String type = dataTypes.get(i - 1);
    +            String fieldValue;
    +
    +            fieldValue = m.group(i);
    +
    +            if (fieldValue == null) {
    +              fieldValue = "";
    +            }
    +
    +            if (type.toUpperCase().equals("INT") || type.toUpperCase().equals("INTEGER")) {
    +              map.integer(fieldName).writeInt(Integer.parseInt(fieldValue));
    +            } else if (type.toUpperCase().equals("DOUBLE") || type.toUpperCase().equals("FLOAT8")) {
    --- End diff --
    
    Even better. This is an inner loop. Presumably we want it to go fast (which is why it is deeply nested.)
    
    So, rather than doing (slow) string compares (you could have used a `switch` statement instead), in the `setup()` method, map names to ordinals: `MinorType.ordinal()`. Then do a `switch` on the resulting numbers using, say, `MinorType.VARCHAR.ordinal()`. Much faster.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167128268
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.logical.FormatPluginConfig;
    +import org.apache.drill.common.logical.StoragePluginConfig;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.proto.UserBitShared;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.RecordWriter;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
    +import org.apache.drill.exec.store.dfs.easy.EasyWriter;
    +import org.apache.drill.exec.store.dfs.easy.FileWork;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +public class LogFormatPlugin extends EasyFormatPlugin<LogFormatPlugin.LogFormatConfig> {
    +
    +  private static final boolean IS_COMPRESSIBLE = true;
    +  private static final String DEFAULT_NAME = "log";
    +  private LogFormatConfig config;
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
    +    this(name, context, fsConf, storageConfig, new LogFormatConfig());
    +  }
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, LogFormatConfig formatPluginConfig) {
    +    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
    +    this.config = formatPluginConfig;
    +  }
    +
    +  @Override
    +  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
    +                                      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
    +    return new LogRecordReader(context, fileWork.getPath(), dfs, columns, config);
    +  }
    +
    +  @Override
    +  public int getReaderOperatorType() {
    +    return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
    +  }
    +
    +  @Override
    +  public int getWriterOperatorType() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean supportsPushDown() {
    +    return true;
    +  }
    +
    +  @Override
    +  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
    +    return null;
    +  }
    +
    +  @JsonTypeName("log")
    +  public static class LogFormatConfig implements FormatPluginConfig {
    +    public List<String> extensions;
    +    public List<String> fieldNames;
    +    public List<String> dataTypes;
    +    public String dateFormat = "";
    +    public String timeFormat = "HH:mm:ss";
    +    public String pattern;
    +    public Boolean errorOnMismatch = false;
    +
    +    private static final List<String> DEFAULT_EXTS = ImmutableList.of("log");
    --- End diff --
    
    To ensure this plugin is usable by default, maybe define a default layout that matches the entire line as a single column?


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by cgivre <gi...@git.apache.org>.
Github user cgivre commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r168939597
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    +      if (timeFormat != null && !timeFormat.isEmpty()) {
    +        tf = new java.text.SimpleDateFormat(timeFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid time format.  The time formatting string was empty.  Please specify a valid time format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +  }
    +
    +  public int next() {
    +    this.writer.allocate();
    +    this.writer.reset();
    +
    +    int recordCount = 0;
    +
    +    try {
    +      BaseWriter.MapWriter map = this.writer.rootAsMap();
    +      String line = null;
    +
    +      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
    +        lineCount++;
    +
    +        // Skip empty lines
    +        if (line.trim().length() == 0) {
    +          continue;
    +        }
    +
    +        this.writer.setPosition(recordCount);
    +        map.start();
    +
    +        Matcher m = r.matcher(line);
    +        if (m.find()) {
    +          for (int i = 1; i <= m.groupCount(); i++) {
    +
    +            String fieldName = fieldNames.get(i - 1);
    +            String type = dataTypes.get(i - 1);
    +            String fieldValue;
    +
    +            fieldValue = m.group(i);
    +
    +            if (fieldValue == null) {
    +              fieldValue = "";
    +            }
    +
    +            if (type.toUpperCase().equals("INT") || type.toUpperCase().equals("INTEGER")) {
    +              map.integer(fieldName).writeInt(Integer.parseInt(fieldValue));
    +            } else if (type.toUpperCase().equals("DOUBLE") || type.toUpperCase().equals("FLOAT8")) {
    +              map.float8(fieldName).writeFloat8(Double.parseDouble(fieldValue));
    --- End diff --
    
    Hi @paul-rogers 
    Is there an example of the `ResultSetLoader` that you could point me to?


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167128074
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.drill.exec.store.log;
    --- End diff --
    
    Generally, the package statement goes below the copyright notice.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167129450
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    --- End diff --
    
    `else` not needed; can't get here if the exception is thrown.


---

[GitHub] drill issue #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/1114
  
    See [this example](https://github.com/paul-rogers/drill/tree/regex-plugin/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/regex), and [this test](https://github.com/paul-rogers/drill/blob/regex-plugin/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/regex/TestRegexReader.java) for examples of one way to address some of the comments made in the code review. That example handles projection, which was mentioned in one of the review comments.
    
    Since the regex is file-specific, the regex format plugin is most useful if it can be configured per-file using table functions. See [DRILL-6167](https://issues.apache.org/jira/browse/DRILL-6167), [DRILL-6168](https://issues.apache.org/jira/browse/DRILL-6168) and [DRILL-6169](https://issues.apache.org/jira/browse/DRILL-6169) for problems that will be encountered. See the example and tests above for how to work around the bugs.
    
    Although I suggested looking at the `ResultSetLoader`, it is a bit premature to do so. That mechanism relies on additional mechanisms that have not yet been committed to master. So, we need to work with the mechanisms we have now. See the example reader above for how to cache the per-column mutator needed to write to vectors without the switch statements in the code in this PR.
    
    To generalize, the example has an object per column that holds per-column info. The example uses only `VarChar` columns. To add additional types, create a base column state class with subclasses for each type. Then, simply call a `save(String value)` method to write a column. That method can handle nulls (for projected non-existent columns) and type conversions (where needed).
    
    Finally, feel free to borrow liberally from the example. (The example was created for our Drill book, so is fair game to reuse.)


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167129411
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    --- End diff --
    
    Suggestion. The computer will execute this code just as fast if it is broken into methods. But, humans can understand the code easier if this is implemented as:
    
    `validateConfig()` which does what the name suggests, by calling...
    
    `validateRegex()`, `validateDate()`, and so on.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167131794
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    +      if (timeFormat != null && !timeFormat.isEmpty()) {
    +        tf = new java.text.SimpleDateFormat(timeFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid time format.  The time formatting string was empty.  Please specify a valid time format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +  }
    +
    +  public int next() {
    +    this.writer.allocate();
    +    this.writer.reset();
    +
    +    int recordCount = 0;
    +
    +    try {
    +      BaseWriter.MapWriter map = this.writer.rootAsMap();
    +      String line = null;
    +
    +      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
    +        lineCount++;
    +
    +        // Skip empty lines
    +        if (line.trim().length() == 0) {
    +          continue;
    +        }
    +
    +        this.writer.setPosition(recordCount);
    +        map.start();
    +
    +        Matcher m = r.matcher(line);
    +        if (m.find()) {
    +          for (int i = 1; i <= m.groupCount(); i++) {
    +
    +            String fieldName = fieldNames.get(i - 1);
    +            String type = dataTypes.get(i - 1);
    +            String fieldValue;
    +
    +            fieldValue = m.group(i);
    +
    +            if (fieldValue == null) {
    +              fieldValue = "";
    +            }
    +
    +            if (type.toUpperCase().equals("INT") || type.toUpperCase().equals("INTEGER")) {
    --- End diff --
    
    `type.equalsIgnoreCase(MinorType.INT.name())` and so on.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167131014
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    --- End diff --
    
    I believe this will throw an exception for a bad format. Should catch it.
    
    Also, OK to use `SimpleDateFormat` and import the class. This code does not have the same draconian restrictions as UDFs.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by cgivre <gi...@git.apache.org>.
Github user cgivre commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r170690424
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.logical.FormatPluginConfig;
    +import org.apache.drill.common.logical.StoragePluginConfig;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.proto.UserBitShared;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.RecordWriter;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
    +import org.apache.drill.exec.store.dfs.easy.EasyWriter;
    +import org.apache.drill.exec.store.dfs.easy.FileWork;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +public class LogFormatPlugin extends EasyFormatPlugin<LogFormatPlugin.LogFormatConfig> {
    +
    +  private static final boolean IS_COMPRESSIBLE = true;
    +  private static final String DEFAULT_NAME = "log";
    +  private LogFormatConfig config;
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
    +    this(name, context, fsConf, storageConfig, new LogFormatConfig());
    +  }
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, LogFormatConfig formatPluginConfig) {
    +    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
    +    this.config = formatPluginConfig;
    +  }
    +
    +  @Override
    +  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
    +                                      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
    +    return new LogRecordReader(context, fileWork.getPath(), dfs, columns, config);
    +  }
    +
    +  @Override
    +  public int getReaderOperatorType() {
    +    return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
    +  }
    +
    +  @Override
    +  public int getWriterOperatorType() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean supportsPushDown() {
    +    return true;
    --- End diff --
    
    @paul-rogers Are there any examples of how to implement this?  I would like this to be as fast as possible.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by cgivre <gi...@git.apache.org>.
Github user cgivre commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r170675226
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.logical.FormatPluginConfig;
    +import org.apache.drill.common.logical.StoragePluginConfig;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.proto.UserBitShared;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.RecordWriter;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
    +import org.apache.drill.exec.store.dfs.easy.EasyWriter;
    +import org.apache.drill.exec.store.dfs.easy.FileWork;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +public class LogFormatPlugin extends EasyFormatPlugin<LogFormatPlugin.LogFormatConfig> {
    +
    +  private static final boolean IS_COMPRESSIBLE = true;
    +  private static final String DEFAULT_NAME = "log";
    +  private LogFormatConfig config;
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
    +    this(name, context, fsConf, storageConfig, new LogFormatConfig());
    +  }
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, LogFormatConfig formatPluginConfig) {
    +    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
    +    this.config = formatPluginConfig;
    +  }
    +
    +  @Override
    +  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
    +                                      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
    +    return new LogRecordReader(context, fileWork.getPath(), dfs, columns, config);
    +  }
    +
    +  @Override
    +  public int getReaderOperatorType() {
    +    return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
    +  }
    +
    +  @Override
    +  public int getWriterOperatorType() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean supportsPushDown() {
    +    return true;
    +  }
    +
    +  @Override
    +  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
    +    return null;
    +  }
    +
    +  @JsonTypeName("log")
    +  public static class LogFormatConfig implements FormatPluginConfig {
    +    public List<String> extensions;
    +    public List<String> fieldNames;
    +    public List<String> dataTypes;
    +    public String dateFormat = "";
    +    public String timeFormat = "HH:mm:ss";
    +    public String pattern;
    +    public Boolean errorOnMismatch = false;
    +
    +    private static final List<String> DEFAULT_EXTS = ImmutableList.of("log");
    --- End diff --
    
    I added a default regex of `(.*)` which should capture the entire line as well as a field name of `full_line` by default. 


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167132178
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    +      if (timeFormat != null && !timeFormat.isEmpty()) {
    +        tf = new java.text.SimpleDateFormat(timeFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid time format.  The time formatting string was empty.  Please specify a valid time format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +  }
    +
    +  public int next() {
    +    this.writer.allocate();
    +    this.writer.reset();
    +
    +    int recordCount = 0;
    +
    +    try {
    +      BaseWriter.MapWriter map = this.writer.rootAsMap();
    +      String line = null;
    +
    +      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
    +        lineCount++;
    +
    +        // Skip empty lines
    +        if (line.trim().length() == 0) {
    +          continue;
    +        }
    +
    +        this.writer.setPosition(recordCount);
    +        map.start();
    +
    +        Matcher m = r.matcher(line);
    +        if (m.find()) {
    +          for (int i = 1; i <= m.groupCount(); i++) {
    +
    +            String fieldName = fieldNames.get(i - 1);
    +            String type = dataTypes.get(i - 1);
    +            String fieldValue;
    +
    +            fieldValue = m.group(i);
    +
    +            if (fieldValue == null) {
    +              fieldValue = "";
    +            }
    +
    +            if (type.toUpperCase().equals("INT") || type.toUpperCase().equals("INTEGER")) {
    +              map.integer(fieldName).writeInt(Integer.parseInt(fieldValue));
    +            } else if (type.toUpperCase().equals("DOUBLE") || type.toUpperCase().equals("FLOAT8")) {
    +              map.float8(fieldName).writeFloat8(Double.parseDouble(fieldValue));
    --- End diff --
    
    The above is going to be slow as well. For speed, also cache the writers.
    
    Or, you can use the new `ResultSetLoader` which provides index access to its writers.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167128552
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.logical.FormatPluginConfig;
    +import org.apache.drill.common.logical.StoragePluginConfig;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.proto.UserBitShared;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.RecordWriter;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
    +import org.apache.drill.exec.store.dfs.easy.EasyWriter;
    +import org.apache.drill.exec.store.dfs.easy.FileWork;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +public class LogFormatPlugin extends EasyFormatPlugin<LogFormatPlugin.LogFormatConfig> {
    +
    +  private static final boolean IS_COMPRESSIBLE = true;
    +  private static final String DEFAULT_NAME = "log";
    +  private LogFormatConfig config;
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
    +    this(name, context, fsConf, storageConfig, new LogFormatConfig());
    +  }
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, LogFormatConfig formatPluginConfig) {
    +    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
    +    this.config = formatPluginConfig;
    +  }
    +
    +  @Override
    +  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
    +                                      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
    +    return new LogRecordReader(context, fileWork.getPath(), dfs, columns, config);
    +  }
    +
    +  @Override
    +  public int getReaderOperatorType() {
    +    return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
    +  }
    +
    +  @Override
    +  public int getWriterOperatorType() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean supportsPushDown() {
    +    return true;
    +  }
    +
    +  @Override
    +  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
    +    return null;
    +  }
    +
    +  @JsonTypeName("log")
    +  public static class LogFormatConfig implements FormatPluginConfig {
    +    public List<String> extensions;
    +    public List<String> fieldNames;
    +    public List<String> dataTypes;
    +    public String dateFormat = "";
    +    public String timeFormat = "HH:mm:ss";
    +    public String pattern;
    +    public Boolean errorOnMismatch = false;
    +
    +    private static final List<String> DEFAULT_EXTS = ImmutableList.of("log");
    +
    +    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
    +    public List<String> getExtensions() {
    +      if (extensions == null) {
    +        return DEFAULT_EXTS;
    +      }
    +      return extensions;
    +    }
    +
    +    public List<String> getFieldNames() {
    +      return fieldNames;
    +    }
    +
    +    public List<String> getDataTypes() {
    +      return dataTypes;
    +    }
    +
    +    public String getDateFormat() {
    +      return dateFormat;
    +    }
    +
    +    public String getTimeFormat() {
    +
    +      return timeFormat;
    +    }
    +
    +    public String getPattern() {
    +      return pattern;
    +    }
    +
    +    public Boolean getErrorOnMismatch() {
    +      return errorOnMismatch;
    +    }
    +
    +    @Override
    +    public int hashCode() {
    +      int result = pattern != null ? pattern.hashCode() : 0;
    +      result = 31 * result + (dateFormat != null ? dateFormat.hashCode() : 0) + (timeFormat != null ? timeFormat.hashCode() : 0);
    +      return result;
    +    }
    +
    +    @Override
    +    public boolean equals(Object obj) {
    +      if (this == obj) {
    +        return true;
    +      } else if (obj == null) {
    +        return false;
    +      } else if (getClass() == obj.getClass()) {
    --- End diff --
    
    Two comments. First is exact class match required? Is `obj instance LogFormatConfig` sufficient?
    
    Second, the code does not actually do any member comparisons. Is it the intention that all configs are equal to one another? (I'm not sure how we even use this comparison, to be honest.)


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by cgivre <gi...@git.apache.org>.
Github user cgivre commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r170675294
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.logical.FormatPluginConfig;
    +import org.apache.drill.common.logical.StoragePluginConfig;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.proto.UserBitShared;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.RecordWriter;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
    +import org.apache.drill.exec.store.dfs.easy.EasyWriter;
    +import org.apache.drill.exec.store.dfs.easy.FileWork;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +public class LogFormatPlugin extends EasyFormatPlugin<LogFormatPlugin.LogFormatConfig> {
    +
    +  private static final boolean IS_COMPRESSIBLE = true;
    +  private static final String DEFAULT_NAME = "log";
    +  private LogFormatConfig config;
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
    +    this(name, context, fsConf, storageConfig, new LogFormatConfig());
    +  }
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, LogFormatConfig formatPluginConfig) {
    +    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
    +    this.config = formatPluginConfig;
    +  }
    +
    +  @Override
    +  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
    +                                      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
    +    return new LogRecordReader(context, fileWork.getPath(), dfs, columns, config);
    +  }
    +
    +  @Override
    +  public int getReaderOperatorType() {
    +    return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
    +  }
    +
    +  @Override
    +  public int getWriterOperatorType() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean supportsPushDown() {
    +    return true;
    +  }
    +
    +  @Override
    +  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
    +    return null;
    +  }
    +
    +  @JsonTypeName("log")
    +  public static class LogFormatConfig implements FormatPluginConfig {
    +    public List<String> extensions;
    +    public List<String> fieldNames;
    +    public List<String> dataTypes;
    +    public String dateFormat = "";
    +    public String timeFormat = "HH:mm:ss";
    +    public String pattern;
    +    public Boolean errorOnMismatch = false;
    +
    +    private static final List<String> DEFAULT_EXTS = ImmutableList.of("log");
    +
    +    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
    +    public List<String> getExtensions() {
    +      if (extensions == null) {
    +        return DEFAULT_EXTS;
    +      }
    +      return extensions;
    +    }
    +
    +    public List<String> getFieldNames() {
    +      return fieldNames;
    +    }
    +
    +    public List<String> getDataTypes() {
    +      return dataTypes;
    +    }
    +
    +    public String getDateFormat() {
    +      return dateFormat;
    +    }
    +
    +    public String getTimeFormat() {
    +
    +      return timeFormat;
    +    }
    +
    +    public String getPattern() {
    +      return pattern;
    +    }
    +
    +    public Boolean getErrorOnMismatch() {
    +      return errorOnMismatch;
    +    }
    +
    +    @Override
    +    public int hashCode() {
    +      int result = pattern != null ? pattern.hashCode() : 0;
    +      result = 31 * result + (dateFormat != null ? dateFormat.hashCode() : 0) + (timeFormat != null ? timeFormat.hashCode() : 0);
    --- End diff --
    
    Fixed


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by cgivre <gi...@git.apache.org>.
Github user cgivre commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r170675324
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    --- End diff --
    
    Fixed


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167130781
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    --- End diff --
    
    `MinorType.VARCHAR.name()`


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167130507
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    --- End diff --
    
    Oh my! If I'm short one type, I get no warning, just the wrong data types?
    
    Maybe:
    
    * If null, silently fill in VarChar types.
    * If fewer types than fields, fill in missing types with VarChar, logging that we are doing this.



---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by cgivre <gi...@git.apache.org>.
Github user cgivre commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r170675202
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.drill.exec.store.log;
    --- End diff --
    
    Fixed


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r175244984
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.drill.exec.store.log;
    --- End diff --
    
    The comment says fixed. Did you forget to commit the changes to your branch? Github still shows the original code...


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167128873
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java ---
    @@ -0,0 +1,151 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.logical.FormatPluginConfig;
    +import org.apache.drill.common.logical.StoragePluginConfig;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.proto.UserBitShared;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.store.RecordReader;
    +import org.apache.drill.exec.store.RecordWriter;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
    +import org.apache.drill.exec.store.dfs.easy.EasyWriter;
    +import org.apache.drill.exec.store.dfs.easy.FileWork;
    +import org.apache.hadoop.conf.Configuration;
    +
    +import java.io.IOException;
    +import java.util.List;
    +
    +public class LogFormatPlugin extends EasyFormatPlugin<LogFormatPlugin.LogFormatConfig> {
    +
    +  private static final boolean IS_COMPRESSIBLE = true;
    +  private static final String DEFAULT_NAME = "log";
    +  private LogFormatConfig config;
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogFormatPlugin.class);
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
    +    this(name, context, fsConf, storageConfig, new LogFormatConfig());
    +  }
    +
    +  public LogFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, LogFormatConfig formatPluginConfig) {
    +    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
    +    this.config = formatPluginConfig;
    +  }
    +
    +  @Override
    +  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
    +                                      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
    +    return new LogRecordReader(context, fileWork.getPath(), dfs, columns, config);
    +  }
    +
    +  @Override
    +  public int getReaderOperatorType() {
    +    return UserBitShared.CoreOperatorType.JSON_SUB_SCAN_VALUE;
    +  }
    +
    +  @Override
    +  public int getWriterOperatorType() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean supportsPushDown() {
    +    return true;
    +  }
    +
    +  @Override
    +  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
    +    return null;
    +  }
    +
    +  @JsonTypeName("log")
    +  public static class LogFormatConfig implements FormatPluginConfig {
    +    public List<String> extensions;
    +    public List<String> fieldNames;
    +    public List<String> dataTypes;
    +    public String dateFormat = "";
    +    public String timeFormat = "HH:mm:ss";
    +    public String pattern;
    +    public Boolean errorOnMismatch = false;
    +
    +    private static final List<String> DEFAULT_EXTS = ImmutableList.of("log");
    +
    +    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
    +    public List<String> getExtensions() {
    +      if (extensions == null) {
    +        return DEFAULT_EXTS;
    +      }
    +      return extensions;
    +    }
    +
    +    public List<String> getFieldNames() {
    +      return fieldNames;
    +    }
    +
    +    public List<String> getDataTypes() {
    +      return dataTypes;
    +    }
    +
    +    public String getDateFormat() {
    +      return dateFormat;
    +    }
    +
    +    public String getTimeFormat() {
    +
    +      return timeFormat;
    +    }
    +
    +    public String getPattern() {
    +      return pattern;
    +    }
    +
    +    public Boolean getErrorOnMismatch() {
    +      return errorOnMismatch;
    +    }
    +
    +    @Override
    +    public int hashCode() {
    +      int result = pattern != null ? pattern.hashCode() : 0;
    +      result = 31 * result + (dateFormat != null ? dateFormat.hashCode() : 0) + (timeFormat != null ? timeFormat.hashCode() : 0);
    --- End diff --
    
    This is incomplete, it does not hash all fields. Not that Guava (I believe) provides some handy tools to help with hashing, including handling null members. See [this file](https://github.com/paul-rogers/drill/blob/regex-plugin/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/regex/RegexFormatConfig.java) for an example.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167130905
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    --- End diff --
    
    As above.
    
    But, Drill uses SQL rules: names are case insensitive. So, need to use a different solution. (Does Guava have something?)


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167131709
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    +      if (timeFormat != null && !timeFormat.isEmpty()) {
    +        tf = new java.text.SimpleDateFormat(timeFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid time format.  The time formatting string was empty.  Please specify a valid time format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +  }
    +
    +  public int next() {
    +    this.writer.allocate();
    +    this.writer.reset();
    +
    +    int recordCount = 0;
    +
    +    try {
    +      BaseWriter.MapWriter map = this.writer.rootAsMap();
    +      String line = null;
    +
    +      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
    +        lineCount++;
    +
    +        // Skip empty lines
    +        if (line.trim().length() == 0) {
    +          continue;
    +        }
    +
    +        this.writer.setPosition(recordCount);
    +        map.start();
    +
    +        Matcher m = r.matcher(line);
    +        if (m.find()) {
    +          for (int i = 1; i <= m.groupCount(); i++) {
    --- End diff --
    
    Why a `1`-based loop with the funny `i - 1` indexing? Just do `i + 1` as needed to fetch the group.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167131439
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    +      if (timeFormat != null && !timeFormat.isEmpty()) {
    +        tf = new java.text.SimpleDateFormat(timeFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid time format.  The time formatting string was empty.  Please specify a valid time format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +  }
    +
    +  public int next() {
    +    this.writer.allocate();
    +    this.writer.reset();
    +
    +    int recordCount = 0;
    +
    +    try {
    +      BaseWriter.MapWriter map = this.writer.rootAsMap();
    +      String line = null;
    +
    +      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
    +        lineCount++;
    +
    +        // Skip empty lines
    +        if (line.trim().length() == 0) {
    +          continue;
    +        }
    +
    +        this.writer.setPosition(recordCount);
    +        map.start();
    --- End diff --
    
    Do this below after matching the line. Else, we'll do this for non-matching lines.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167129981
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    --- End diff --
    
    Suggestion: `!=` -> `>=`. This allows debugging. Define all fields. Then, work out the pattern for the first. Then do the second. Very hard to get all fields right the first time. Nuisance to remove fields temporarily.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by cgivre <gi...@git.apache.org>.
Github user cgivre commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r170674795
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    +
    +      } else if ((dataTypes == null) || m.groupCount() != dataTypes.size()) {
    +        //If the number of data types is not correct, create a list of varchar
    +        dataTypes = new ArrayList<String>();
    +        for (int i = -0; i < m.groupCount(); i++) {
    +          dataTypes.add("VARCHAR");
    +        }
    +      }
    +    }
    +
    +    //Check and set up date formats
    +    if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
    +      if (dateFormat != null && !dateFormat.isEmpty()) {
    +        df = new java.text.SimpleDateFormat(dateFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid date format.  The date formatting string was empty.  Please specify a valid date format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +    if (dataTypes.contains("TIME")) {
    +      if (timeFormat != null && !timeFormat.isEmpty()) {
    +        tf = new java.text.SimpleDateFormat(timeFormat);
    +      } else {
    +        throw UserException.parseError().message("Invalid time format.  The time formatting string was empty.  Please specify a valid time format string in the configuration for this data source.", 0).build(logger);
    +      }
    +    }
    +
    +  }
    +
    +  public int next() {
    +    this.writer.allocate();
    +    this.writer.reset();
    +
    +    int recordCount = 0;
    +
    +    try {
    +      BaseWriter.MapWriter map = this.writer.rootAsMap();
    +      String line = null;
    +
    +      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
    +        lineCount++;
    +
    +        // Skip empty lines
    +        if (line.trim().length() == 0) {
    +          continue;
    +        }
    +
    +        this.writer.setPosition(recordCount);
    +        map.start();
    +
    +        Matcher m = r.matcher(line);
    +        if (m.find()) {
    +          for (int i = 1; i <= m.groupCount(); i++) {
    --- End diff --
    
    Fixed... Laziness on my part... 


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167130388
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    +      Matcher m = r.matcher("test");
    +      if (m.groupCount() == 0) {
    +        throw UserException.parseError().message("Invalid Regular Expression: No Capturing Groups", 0).build(logger);
    +      } else if (m.groupCount() != (fieldNames.size())) {
    +        throw UserException.parseError().message("Invalid Regular Expression: Field names do not match capturing groups.  There are " + m.groupCount() + " captured groups in the data and " + fieldNames.size() + " specified in the configuration.", 0).build(logger);
    --- End diff --
    
    Generally we try to keep lines to a reasonable size. Also, use the form that will do formatting for you. Suggest:
    
    ```
           throw UserException.parseError()
                .message("Invalid Regular Expression: Field names do not match capturing groups.  " +
                                 "There are %d captured groups in the data and "
                                 " %d specified in the configuration.",
                                 m.groupCount(), fieldNames.size())
                .build(logger);
    ```
    
    Also, maybe shorten the message: `"Found %d groups but %d fields"`
    
    Further, since both might have been in the query (via table properties), might be worth while including the regex and the fields.


---

[GitHub] drill pull request #1114: Drill-6104: Added Logfile Reader

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1114#discussion_r167129510
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java ---
    @@ -0,0 +1,261 @@
    +package org.apache.drill.exec.store.log;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +import io.netty.buffer.DrillBuf;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
    +import org.apache.drill.exec.vector.complex.writer.BaseWriter;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.io.compress.CompressionInputStream;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.text.ParseException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class LogRecordReader extends AbstractRecordReader {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
    +  private static final int MAX_RECORDS_PER_BATCH = 8096;
    +
    +  private String inputPath;
    +  private BufferedReader reader;
    +  private DrillBuf buffer;
    +  private VectorContainerWriter writer;
    +  private LogFormatPlugin.LogFormatConfig config;
    +  private int lineCount;
    +  private Pattern r;
    +
    +  private List<String> fieldNames;
    +  private List<String> dataTypes;
    +  private boolean errorOnMismatch;
    +  private String dateFormat;
    +  private String timeFormat;
    +  private java.text.DateFormat df;
    +  private java.text.DateFormat tf;
    +  private long time;
    +
    +  public LogRecordReader(FragmentContext fragmentContext, String inputPath, DrillFileSystem fileSystem,
    +                         List<SchemaPath> columns, LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
    +    try {
    +      Path hdfsPath = new Path(inputPath);
    +      Configuration conf = new Configuration();
    +      FSDataInputStream fsStream = fileSystem.open(hdfsPath);
    +      CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    +      CompressionCodec codec = factory.getCodec(hdfsPath);
    +      if (codec == null) {
    +        reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
    +      } else {
    +        CompressionInputStream comInputStream = codec.createInputStream(fsStream.getWrappedStream());
    +        reader = new BufferedReader(new InputStreamReader(comInputStream));
    +      }
    +      this.inputPath = inputPath;
    +      this.lineCount = 0;
    +      this.config = config;
    +      this.buffer = fragmentContext.getManagedBuffer(4096);
    +      setColumns(columns);
    +
    +    } catch (IOException e) {
    +      logger.debug("Log Reader Plugin: " + e.getMessage());
    +    }
    +  }
    +
    +  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
    +    this.writer = new VectorContainerWriter(output);
    +    String regex = config.getPattern();
    +
    +
    +    fieldNames = config.getFieldNames();
    +    dataTypes = config.getDataTypes();
    +    dateFormat = config.getDateFormat();
    +    timeFormat = config.getTimeFormat();
    +    errorOnMismatch = config.getErrorOnMismatch();
    +
    +    /*
    +    This section will check for;
    +    1.  Empty regex
    +    2.  Invalid Regex
    +    3.  Empty date string if the date format is used
    +    4.  No capturing groups in the regex
    +    5.  Incorrect number of data types
    +    6.  Invalid data types
    +     */
    +    if (regex.isEmpty()) {
    +      throw UserException.parseError().message("Log parser requires a valid, non-empty regex in the plugin configuration").build(logger);
    +    } else {
    +      //TODO Check for invalid regex
    +      r = Pattern.compile(regex);
    --- End diff --
    
    Catch the exception that will occur if the regex is invalid.


---