You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/19 06:09:42 UTC

[GitHub] Ben-Zvi closed pull request #1114: DRILL-6104: Added Logfile Reader

Ben-Zvi closed pull request #1114: DRILL-6104: Added Logfile Reader
URL: https://github.com/apache/drill/pull/1114
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatConfig.java
new file mode 100644
index 00000000000..c3cf97e26c5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatConfig.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.log;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Objects;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("logRegex")
+public class LogFormatConfig implements FormatPluginConfig {
+
+  private String regex;
+  private String extension;
+  private int maxErrors = 10;
+  private List<LogFormatField> schema;
+
+  public String getRegex() {
+    return regex;
+  }
+
+  public String getExtension() {
+    return extension;
+  }
+
+  public int getMaxErrors() {
+    return maxErrors;
+  }
+
+  public List<LogFormatField> getSchema() {
+    return schema;
+  }
+
+  //Setters
+  public void setExtension(String ext) {
+    this.extension = ext;
+  }
+
+  public void setMaxErrors(int errors) {
+    this.maxErrors = errors;
+  }
+
+  public void setRegex(String regex) {
+    this.regex = regex;
+  }
+
+  public void setSchema() {
+    this.schema = new ArrayList<LogFormatField>();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    LogFormatConfig other = (LogFormatConfig) obj;
+    return Objects.equal(regex, other.regex) &&
+        Objects.equal(maxErrors, other.maxErrors) &&
+        Objects.equal(schema, other.schema) &&
+        Objects.equal(extension, other.extension);
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(new Object[]{regex, maxErrors, schema, extension});
+  }
+
+  @JsonIgnore
+  public List<String> getFieldNames() {
+    List<String> result = new ArrayList<String>();
+    if (this.schema == null) {
+      return result;
+    }
+
+    for (LogFormatField field : this.schema) {
+      result.add(field.getFieldName());
+    }
+    return result;
+  }
+
+  @JsonIgnore
+  public String getDataType(int fieldIndex) {
+    LogFormatField f = this.schema.get(fieldIndex);
+    return f.getFieldType().toUpperCase();
+  }
+
+  @JsonIgnore
+  public LogFormatField getField(int fieldIndex) {
+    return this.schema.get(fieldIndex);
+  }
+
+  @JsonIgnore
+  public String getDateFormat(int patternIndex) {
+    LogFormatField f = this.schema.get(patternIndex);
+    return f.getFormat();
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatField.java
new file mode 100644
index 00000000000..64a6db76ad7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatField.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.log;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("regexReaderFieldDescription")
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class LogFormatField {
+
+  /*
+   * The three configuration options for a field are:
+   * 1.  The field name
+   * 2.  The data type (fieldType).  Field type defaults to VARCHAR if it is not specified
+   * 3.  The format string which is used for date/time fields.  This field is ignored if used with a non
+   * date/time field.
+   * */
+
+  private String fieldName = "";
+  private String fieldType = "VARCHAR";
+  private String format;
+
+  //These will be used in the future for field validation and masking
+  //public String validator;
+  //public double minValue;
+  //public double maxValue;
+
+
+  public LogFormatField() {
+  }
+
+  //These constructors are used for unit testing
+  public LogFormatField(String fieldName) {
+    this(fieldName, null, null);
+  }
+
+  public LogFormatField(String fieldName, String fieldType) {
+    this(fieldName, fieldType, null);
+  }
+
+  public LogFormatField(String fieldName, String fieldType, String format) {
+    this.fieldName = fieldName;
+    this.fieldType = fieldType;
+    this.format = format;
+  }
+
+  public String getFieldName() {
+    return fieldName;
+  }
+
+  public String getFieldType() {
+    return fieldType;
+  }
+
+  public String getFormat() {
+    return format;
+  }
+
+
+  /*
+  public String getValidator() { return validator; }
+
+  public double getMinValue() { return minValue; }
+
+  public double getMaxValue() {
+    return maxValue;
+  }
+  */
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
new file mode 100644
index 00000000000..d2e6772891d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.log;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+
+public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
+
+  public static final String DEFAULT_NAME = "logRegex";
+  private final LogFormatConfig formatConfig;
+
+  public LogFormatPlugin(String name, DrillbitContext context,
+                         Configuration fsConf, StoragePluginConfig storageConfig,
+                         LogFormatConfig formatConfig) {
+    super(name, context, fsConf, storageConfig, formatConfig,
+        true,  // readable
+        false, // writable
+        true, // blockSplittable
+        true,  // compressible
+        Lists.newArrayList(formatConfig.getExtension()),
+        DEFAULT_NAME);
+    this.formatConfig = formatConfig;
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context,
+                                      DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns,
+                                      String userName) throws ExecutionSetupException {
+    return new LogRecordReader(context, dfs, fileWork,
+        columns, userName, formatConfig);
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+    return true;
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context,
+                                      EasyWriter writer) throws UnsupportedOperationException {
+    throw new UnsupportedOperationException("unimplemented");
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+    return UserBitShared.CoreOperatorType.REGEX_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+    throw new UnsupportedOperationException("unimplemented");
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
new file mode 100644
index 00000000000..58f07f28315
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.log;
+
+import com.google.common.base.Charsets;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableSmallIntVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableTimeVector;
+
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+
+  private abstract static class ColumnDefn {
+    private final String name;
+    private final int index;
+    private final String format;
+
+    public ColumnDefn(String name, int index) {
+      this(name, index, null);
+    }
+
+    public ColumnDefn(String name, int index, String format) {
+      this.name = name;
+      this.index = index;
+      this.format = format;
+    }
+
+    public abstract void define(OutputMutator outputMutator) throws SchemaChangeException;
+
+    public abstract void load(int rowIndex, String value);
+
+    public String getName() { return this.name; }
+
+    public int getIndex() { return this.index; }
+
+    public String getFormat() { return this.format;}
+
+    @Override
+    //For testing
+    public String toString() {
+      return "Name: " + name + ", Index: " + index + ", Format: " + format;
+    }
+  }
+
+  private static class VarCharDefn extends ColumnDefn {
+
+    private NullableVarCharVector.Mutator mutator;
+
+    public VarCharDefn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.VARCHAR));
+      mutator = outputMutator.addField(field, NullableVarCharVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      mutator.set(rowIndex, value.getBytes());
+    }
+  }
+
+  private static class BigIntDefn extends ColumnDefn {
+
+    private NullableBigIntVector.Mutator mutator;
+
+    public BigIntDefn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.BIGINT));
+      mutator = outputMutator.addField(field, NullableBigIntVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        mutator.set(rowIndex, Long.parseLong(value));
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an INT field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class SmallIntDefn extends ColumnDefn {
+
+    private NullableSmallIntVector.Mutator mutator;
+
+    public SmallIntDefn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.SMALLINT));
+      mutator = outputMutator.addField(field, NullableSmallIntVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        mutator.set(rowIndex, Short.parseShort(value));
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an INT field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class IntDefn extends ColumnDefn {
+
+    private NullableIntVector.Mutator mutator;
+
+    public IntDefn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.INT));
+      mutator = outputMutator.addField(field, NullableIntVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        mutator.set(rowIndex, Integer.parseInt(value));
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an INT field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class Float4Defn extends ColumnDefn {
+
+    private NullableFloat4Vector.Mutator mutator;
+
+    public Float4Defn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.FLOAT4));
+      mutator = outputMutator.addField(field, NullableFloat4Vector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        mutator.set(rowIndex, Float.parseFloat(value));
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an FLOAT field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class DoubleDefn extends ColumnDefn {
+
+    private NullableFloat8Vector.Mutator mutator;
+
+    public DoubleDefn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.FLOAT8));
+      mutator = outputMutator.addField(field, NullableFloat8Vector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        mutator.set(rowIndex, Double.parseDouble(value));
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an FLOAT field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class DateDefn extends ColumnDefn {
+
+    private NullableDateVector.Mutator mutator;
+    private SimpleDateFormat df;
+
+    public DateDefn(String name, int index, String dateFormat) {
+      super(name, index, dateFormat);
+      df = getValidDateObject(dateFormat);
+    }
+
+    private SimpleDateFormat getValidDateObject(String d) {
+      SimpleDateFormat tempDateFormat;
+      if (d != null && !d.isEmpty()) {
+        tempDateFormat = new SimpleDateFormat(d);
+      } else {
+        throw UserException.parseError()
+            .message("Invalid date format.  The date formatting string was empty.")
+            .build(logger);
+      }
+      return tempDateFormat;
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.DATE));
+      mutator = outputMutator.addField(field, NullableDateVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        Date d = df.parse(value);
+        long milliseconds = d.getTime();
+        mutator.set(rowIndex, milliseconds);
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an DATE field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      } catch (ParseException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Date Format String does not match field value.")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Format String", getFormat())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class TimeDefn extends ColumnDefn {
+
+    private NullableTimeVector.Mutator mutator;
+    private SimpleDateFormat df;
+
+    public TimeDefn(String name, int index, String dateFormat) {
+      super(name, index, dateFormat);
+      df = getValidDateObject(dateFormat);
+    }
+
+    private SimpleDateFormat getValidDateObject(String d) {
+      SimpleDateFormat tempDateFormat;
+      if (d != null && !d.isEmpty()) {
+        tempDateFormat = new SimpleDateFormat(d);
+      } else {
+        throw UserException.parseError()
+            .message("Invalid date format.  The date formatting string was empty.")
+            .build(logger);
+      }
+      return tempDateFormat;
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.TIME));
+      mutator = outputMutator.addField(field, NullableTimeVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        Date d = df.parse(value);
+        int milliseconds = (int) d.getTime();
+        mutator.set(rowIndex, milliseconds);
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an Time field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      } catch (ParseException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Date Format String does not match field value.")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Format String", getFormat())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class TimeStampDefn extends ColumnDefn {
+
+    private NullableTimeStampVector.Mutator mutator;
+    private SimpleDateFormat df;
+
+    public TimeStampDefn(String name, int index, String dateFormat) {
+      super(name, index, dateFormat);
+      df = getValidDateObject(dateFormat);
+    }
+
+    private SimpleDateFormat getValidDateObject(String d) {
+      SimpleDateFormat tempDateFormat;
+      if (d != null && !d.isEmpty()) {
+        tempDateFormat = new SimpleDateFormat(d);
+      } else {
+        throw UserException.parseError()
+            .message("Invalid date format.  The date formatting string was empty.")
+            .build(logger);
+      }
+      return tempDateFormat;
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.TIMESTAMP));
+      mutator = outputMutator.addField(field, NullableTimeStampVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        Date d = df.parse(value);
+        long milliseconds = d.getTime();
+        mutator.set(rowIndex, milliseconds);
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse a TIMESTAMP field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      } catch (ParseException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Date Format String does not match field value.")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Format String", getFormat())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static final int BATCH_SIZE = BaseValueVector.INITIAL_VALUE_ALLOCATION;
+
+  private final DrillFileSystem dfs;
+  private final FileWork fileWork;
+  private final String userName;
+  private final LogFormatConfig formatConfig;
+  private ColumnDefn columns[];
+  private Pattern pattern;
+  private BufferedReader reader;
+  private int rowIndex;
+  private int capturingGroups;
+  private OutputMutator outputMutator;
+  private int unmatchedColumnIndex;
+  private int unmatchedRowIndex;
+  private boolean unmatchedRows;
+  private int maxErrors;
+
+
+  private int errorCount;
+
+
+  public LogRecordReader(FragmentContext context, DrillFileSystem dfs,
+                         FileWork fileWork, List<SchemaPath> columns, String userName,
+                         LogFormatConfig formatConfig) {
+    this.dfs = dfs;
+    this.fileWork = fileWork;
+    this.userName = userName;
+    this.formatConfig = formatConfig;
+    this.unmatchedColumnIndex = -1;
+    this.unmatchedRowIndex = 0;
+    this.unmatchedRows = false;
+    this.maxErrors = formatConfig.getMaxErrors();
+
+    // Ask the superclass to parse the projection list.
+    setColumns(columns);
+
+    if (maxErrors < 0) {
+      throw UserException
+          .validationError()
+          .message("Max Errors must be a positive integer greater than zero.")
+          .build(logger);
+    }
+
+  }
+
+  @Override
+  public void setup(final OperatorContext context, final OutputMutator output) {
+    this.outputMutator = output;
+
+    setupPattern();
+    openFile();
+    setupProjection();
+    defineVectors();
+  }
+
+  private void setupPattern() {
+    try {
+      this.pattern = Pattern.compile(this.formatConfig.getRegex());
+      Matcher m = pattern.matcher("test");
+      capturingGroups = m.groupCount();
+    } catch (PatternSyntaxException e) {
+      throw UserException
+          .validationError(e)
+          .message("Failed to parse regex: \"%s\"", formatConfig.getRegex())
+          .build(logger);
+    }
+  }
+
+  private void setupProjection() {
+    if (isSkipQuery()) {
+      projectNone();
+    } else if (isStarQuery()) {
+      projectAll();
+    } else {
+      projectSubset();
+    }
+  }
+
+  private void projectNone() {
+    columns = new ColumnDefn[]{new VarCharDefn("dummy", -1)};
+  }
+
+  private void openFile() {
+    InputStream in;
+    try {
+      in = dfs.open(new Path(fileWork.getPath()));
+    } catch (Exception e) {
+      throw UserException
+          .dataReadError(e)
+          .message("Failed to open open input file: %s", fileWork.getPath())
+          .addContext("User name", userName)
+          .build(logger);
+    }
+    reader = new BufferedReader(new InputStreamReader(in, Charsets.UTF_8));
+  }
+
+  private void projectAll() {
+    List<String> fields = formatConfig.getFieldNames();
+    for (int i = fields.size(); i < capturingGroups; i++) {
+      fields.add("field_" + i);
+    }
+    columns = new ColumnDefn[capturingGroups];
+
+    for (int i = 0; i < capturingGroups; i++) {
+      columns[i] = makeColumn(fields.get(i), i);
+    }
+  }
+
+  private void projectSubset() {
+    Collection<SchemaPath> project = this.getColumns();
+    assert !project.isEmpty();
+    columns = new ColumnDefn[project.size()];
+
+    List<String> fields = formatConfig.getFieldNames();
+    int colIndex = 0;
+
+
+    for (SchemaPath column : project) {
+      if (column.getAsNamePart().hasChild()) {
+        throw UserException
+            .validationError()
+            .message("The log format plugin supports only simple columns")
+            .addContext("Projected column", column.toString())
+            .build(logger);
+      }
+
+      String name = column.getAsNamePart().getName();
+
+      //Need this to retrieve unnamed fields
+      Pattern r = Pattern.compile("^field_(\\d+)$");
+      Matcher m = r.matcher(name);
+      int patternIndex = -1;
+
+      if (name.equals("_unmatched_rows")) {
+        //Set boolean flag to true
+        this.unmatchedRows = true;
+        this.unmatchedColumnIndex = colIndex;
+      } else if (m.find()) {
+        //if no fields are defined in the configuration, then all the fields have names of 'field_n'
+        //Therefore n is the column index
+        patternIndex = Integer.parseInt(m.group(1));
+      } else {
+        for (int i = 0; i < fields.size(); i++) {
+          if (fields.get(i).equalsIgnoreCase(name) ||
+              fields.get(i).equals("_raw") ||
+              fields.get(i).equals("_unmatched_rows")
+              ) {
+            patternIndex = i;
+
+            break;
+          }
+        }
+      }
+      columns[colIndex++] = makeColumn(name, patternIndex);
+    }
+
+  }
+
+  private ColumnDefn makeColumn(String name, int patternIndex) {
+    String typeName = null;
+    if (patternIndex <= -1 || formatConfig.getSchema() == null) {
+      // Use VARCHAR for missing columns
+      // (instead of Drill standard of nullable int)
+      typeName = MinorType.VARCHAR.name();
+    } else if (patternIndex < formatConfig.getSchema().size()) {
+      //typeName = formatConfig.getDataType(patternIndex);
+      LogFormatField tempField = formatConfig.getField(patternIndex);
+      typeName = tempField.getFieldType().toUpperCase();
+    }
+    if (typeName == null) {
+      // No type name. VARCHAR is a safe guess
+      typeName = MinorType.VARCHAR.name();
+    }
+    if (name.equals("_raw") || name.equals("_unmatched_rows")) {
+      return new VarCharDefn(name, patternIndex);
+    }
+
+    MinorType type = MinorType.valueOf(typeName);
+    //System.out.println( "Type name: "  + typeName + " Type: " + type);
+    switch (type) {
+      case VARCHAR:
+        return new VarCharDefn(name, patternIndex);
+      case INT:
+        return new IntDefn(name, patternIndex);
+      case SMALLINT:
+        return new SmallIntDefn(name, patternIndex);
+      case BIGINT:
+        return new BigIntDefn(name, patternIndex);
+      case FLOAT4:
+        return new Float4Defn(name, patternIndex);
+      case FLOAT8:
+        return new DoubleDefn(name, patternIndex);
+      case DATE:
+        return new DateDefn(name, patternIndex, formatConfig.getDateFormat(patternIndex));
+      case TIMESTAMP:
+        return new TimeStampDefn(name, patternIndex, formatConfig.getDateFormat(patternIndex));
+      case TIME:
+        return new TimeDefn(name, patternIndex, formatConfig.getDateFormat(patternIndex));
+      default:
+        throw UserException
+            .validationError()
+            .message("Undefined column types")
+            .addContext("Position", patternIndex)
+            .addContext("Field name", name)
+            .addContext("Type", typeName)
+            .build(logger);
+    }
+  }
+
+  private void defineVectors() {
+    for (int i = 0; i < columns.length; i++) {
+      try {
+        columns[i].define(outputMutator);
+      } catch (SchemaChangeException e) {
+        throw UserException
+            .systemError(e)
+            .message("Vector creation failed")
+            .build(logger);
+      }
+    }
+  }
+
+  @Override
+  public int next() {
+    rowIndex = 0;
+    while (nextLine()) {
+    }
+    return rowIndex;
+  }
+
+  private boolean nextLine() {
+    String line;
+    try {
+      line = reader.readLine();
+    } catch (IOException e) {
+      throw UserException
+          .dataReadError(e)
+          .message("Error reading file:")
+          .addContext("File", fileWork.getPath())
+          .build(logger);
+    }
+
+    if (line == null) {
+      return false;
+    }
+    Matcher lineMatcher = pattern.matcher(line);
+    if (lineMatcher.matches()) {
+      loadVectors(lineMatcher);
+      return rowIndex < BATCH_SIZE;
+    }
+
+    errorCount++;
+    if (errorCount < maxErrors) {
+      logger.warn("Unmatached line: {}", line);
+    } else if (errorCount > maxErrors) {
+      throw UserException.parseError()
+          .message("Too many errors.  Max error threshold exceeded.")
+          .addContext("Line", line)
+          .addContext("Line number", rowIndex)
+          .build(logger);
+    }
+    //If the user asked for the unmatched columns display them
+    if (unmatchedRows) {
+      //If the user asked for the unmatched columns AND other columns
+      if (columns.length > 1) {
+        columns[unmatchedColumnIndex].load(rowIndex, line);
+        rowIndex++;
+        return rowIndex < BATCH_SIZE;
+      } else {
+        //If the user ONLY asked for the unmatched columns
+        columns[unmatchedColumnIndex].load(unmatchedRowIndex, line);
+        unmatchedRowIndex++;
+        rowIndex = unmatchedRowIndex;
+        return unmatchedRowIndex < BATCH_SIZE;
+      }
+    }
+
+    return true;
+  }
+
+  private void loadVectors(Matcher m) {
+    String value = null;
+    /*if( unmatchedRows && columns.length == 1 ){
+      return;
+    }*/
+
+    for (int i = 0; i < columns.length; i++) {
+      //Skip the unmatched rows column
+      if (columns[i].name.equals("_unmatched_rows")) {
+        continue;
+      }
+
+      if (columns[i].index >= 0) {
+        //Get the value of the regex group
+        value = m.group(columns[i].index + 1);
+
+        //If the value is not null, assign it to the column
+        if (value != null) {
+          columns[i].load(rowIndex, value);
+        }
+      } else if (columns[i].name.equals("_raw")) {
+        //Special case.  The first is if the query contains the _raw column
+        value = m.group(0);
+        if (value != null) {
+          columns[i].load(rowIndex, value);
+        } else {
+          rowIndex++;
+        }
+      }
+    }
+    rowIndex++;
+  }
+
+  @Override
+  public void close() {
+    if (reader != null) {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        logger.warn("Error when closing file: " + fileWork.getPath(), e);
+      }
+      reader = null;
+    }
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/README.md b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/README.md
new file mode 100644
index 00000000000..9110c55667c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/README.md
@@ -0,0 +1,86 @@
+# Drill Regex/Logfile Plugin
+Plugin for Apache Drill that allows Drill to read and query arbitrary files where the schema can be defined by a regex.  The original intent was for this to be used for log files, however, it can be used for any structured data.
+
+## Example Use Case:  MySQL Log
+If you wanted to analyze log files such as the MySQL log sample shown below using Drill, it may be possible using various string fucntions, or you could write a UDF specific to this data however, this is time consuming, difficult and not reusable.
+
+```
+070823 21:00:32       1 Connect     root@localhost on test1
+070823 21:00:48       1 Query       show tables
+070823 21:00:56       1 Query       select * from category
+070917 16:29:01      21 Query       select * from location
+070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1
+```
+This plugin will allow you to configure Drill to directly query logfiles of any configuration.
+
+## Configuration Options
+* **`type`**:  This tells Drill which extension to use.  In this case, it must be `logRegex`.  This field is mandatory.
+* **`regex`**:  This is the regular expression which defines how the log file lines will be split.  You must enclose the parts of the regex in grouping parentheses that you wish to extract.  Note that this plugin uses Java regular expressions and requires that shortcuts such as `\d` have an additional slash:  ie `\\d`.  This field is mandatory.
+* **`extension`**:  This option tells Drill which file extensions should be mapped to this configuration.  Note that you can have multiple configurations of this plugin to allow you to query various log files.  This field is mandatory.
+* **`maxErrors`**:  Log files can be inconsistent and messy.  The `maxErrors` variable allows you to set how many errors the reader will ignore before halting execution and throwing an error.  Defaults to 10.
+* **`schema`**:  The `schema` field is where you define the structure of the log file.  This section is optional.  If you do not define a schema, all fields will be assigned a column name of `field_n` where `n` is the index of the field. The undefined fields will be assigned a default data type of `VARCHAR`.
+
+### Defining a Schema
+The schema variable is an JSON array of fields which have at the moment, three possible variables:
+* **`fieldName`**:  This is the name of the field.
+* **`fieldType`**:  Defines the data type.  Defaults to `VARCHAR` if undefined. At the time of writing, the reader supports: `VARCHAR`, `INT`, `SMALLINT`, `BIGINT`, `FLOAT4`, `FLOAT8`, `DATE`, `TIMESTAMP`, `TIME`.
+* **`format`**: Defines the for date/time fields.  This is mandatory if the field is a date/time field.
+
+In the future, it is my hope that the schema section will allow for data masking, validation and other transformations that are commonly used for analysis of log files.
+
+### Example Configuration:
+The configuration below demonstrates how to configure Drill to query the example MySQL log file shown above.
+
+
+```
+"log" : {
+      "type" : "logRegex",
+      "extension" : "log",
+      "regex" : "(\\d{6})\\s(\\d{2}:\\d{2}:\\d{2})\\s+(\\d+)\\s(\\w+)\\s+(.+)",
+      "maxErrors": 10,
+      "schema": [
+        {
+          "fieldName": "eventDate",
+          "fieldType": "DATE",
+          "format": "yyMMdd"
+        },
+        {
+          "fieldName": "eventTime",
+          "fieldType": "TIME",
+          "format": "HH:mm:ss"
+        },
+        {
+          "fieldName": "PID",
+          "fieldType": "INT"
+        },
+        {
+          "fieldName": "action"
+        },
+        {
+          "fieldName": "query"
+        }
+      ]
+   }
+ ```
+
+
+## Example Usage
+
+This format plugin gives you two options for querieng fields.  If you define the fields, you can query them as you would any other data source.  If you do nof define a field in the column `schema` variable, Drill will extract all fields and give them the name `field_n`.  The fields are indexed from `0`.  Therefore if you have a dataset with 5 fields the following query would be valid:
+
+```
+SELECT field_0, field_1, field_2, field_3, field_4
+FROM ..
+```
+
+### Implicit Fields
+In addition to the fields which the user defines, the format plugin has two implicit fields whcih can be useful for debugging your regex.  These fields do not appear in `SELECT *` queries and only will be retrieved when included in a query.
+
+* **`_raw`**:  This field returns the complete lines which matched your regex.
+* **`_unmatched_rows`**:  This field returns rows which **did not** match the regex.  Note: This field ONLY returns the unmatching rows, so if you have a data file of 10 lines, 8 of which match, `SELECT _unmatched_rows` will return 2 rows.  If however, you combine this with another field, such as `_raw`, the `_unmatched_rows` will be `null` when the rows match and have a value when it does not.
+
+
+
+
+
+
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index 8b73b530796..f51fe4c89fb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -17,11 +17,7 @@
  */
 package org.apache.drill.exec.store.dfs;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Collection;
-
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.scanner.RunTimeScan;
 import org.apache.drill.common.scanner.persistence.ScanResult;
@@ -29,7 +25,10 @@
 import org.apache.drill.exec.store.image.ImageFormatConfig;
 import org.junit.Test;
 
-import com.fasterxml.jackson.annotation.JsonTypeName;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 
 public class TestFormatPluginOptionExtractor {
@@ -72,6 +71,9 @@ public void test() {
               "(type: String, fileSystemMetadata: boolean, descriptive: boolean, timeZone: String)", d.presentParams()
           );
           break;
+        case "logRegex":
+          assertEquals(d.typeName, "(type: String, regex: String, extension: String, maxErrors: int, schema: List)", d.presentParams());
+          break;
         default:
           fail("add validation for format plugin type " + d.typeName);
       }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
new file mode 100644
index 00000000000..0df615e929c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.log;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestLogReader extends ClusterTest {
+
+  public static final String DATE_ONLY_PATTERN = "(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d) .*";
+
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    // Define a regex format config for testing.
+
+    defineRegexPlugin();
+  }
+
+  private static void defineRegexPlugin() throws ExecutionSetupException {
+
+    // Create an instance of the regex config.
+    // Note: we can't use the ".log" extension; the Drill .gitignore
+    // file ignores such files, so they'll never get committed. Instead,
+    // make up a fake suffix.
+
+    LogFormatConfig sampleConfig = new LogFormatConfig();
+    sampleConfig.setExtension("log1");
+    sampleConfig.setRegex(DATE_ONLY_PATTERN);
+
+    sampleConfig.setSchema();
+    sampleConfig.getSchema().add( new LogFormatField("year","INT"));
+    sampleConfig.getSchema().add( new LogFormatField("month", "INT"));
+    sampleConfig.getSchema().add( new LogFormatField("day", "INT"));
+
+    // Full Drill log parser definition.
+
+    LogFormatConfig logConfig = new LogFormatConfig();
+    logConfig.setExtension("log1");
+    logConfig.setRegex("(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d) " +
+        "(\\d\\d):(\\d\\d):(\\d\\d),\\d+ " +
+        "\\[([^]]*)] (\\w+)\\s+(\\S+) - (.*)");
+
+    logConfig.setSchema();
+    logConfig.getSchema().add( new LogFormatField("year","INT"));
+    logConfig.getSchema().add( new LogFormatField("month","INT"));
+    logConfig.getSchema().add( new LogFormatField("day","INT"));
+    logConfig.getSchema().add( new LogFormatField("hour","INT"));
+    logConfig.getSchema().add( new LogFormatField("minute","INT"));
+    logConfig.getSchema().add( new LogFormatField("second","INT"));
+    logConfig.getSchema().add( new LogFormatField("thread"));
+    logConfig.getSchema().add( new LogFormatField("level"));
+    logConfig.getSchema().add( new LogFormatField("module"));
+    logConfig.getSchema().add( new LogFormatField("message"));
+
+
+    //Set up additional configs to check the time/date formats
+    LogFormatConfig logDateConfig = new LogFormatConfig();
+    logDateConfig.setExtension("log2");
+    logDateConfig.setRegex("(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}),(\\d+)\\s\\[(\\w+)\\]\\s([A-Z]+)\\s(.+)");
+
+    logDateConfig.setSchema();
+    logDateConfig.getSchema().add( new LogFormatField( "entry_date", "TIMESTAMP", "yy-MM-dd hh:mm:ss"));
+    logDateConfig.getSchema().add( new LogFormatField( "pid", "INT"));
+    logDateConfig.getSchema().add( new LogFormatField( "location"));
+    logDateConfig.getSchema().add( new LogFormatField( "message_type"));
+    logDateConfig.getSchema().add( new LogFormatField( "message"));
+
+    logDateConfig.setMaxErrors(3);
+
+    LogFormatConfig mysqlLogConfig = new LogFormatConfig();
+    mysqlLogConfig.setExtension("sqllog");
+    mysqlLogConfig.setRegex("(\\d{6})\\s(\\d{2}:\\d{2}:\\d{2})\\s+(\\d+)\\s(\\w+)\\s+(.+)");
+
+
+    // Define a temporary format plugin for the "cp" storage plugin.
+    Drillbit drillbit = cluster.drillbit();
+    final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
+    final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin("cp");
+    final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
+    pluginConfig.getFormats().put("sample", sampleConfig);
+    pluginConfig.getFormats().put("drill-log", logConfig);
+    pluginConfig.getFormats().put("date-log",logDateConfig);
+    pluginConfig.getFormats().put( "mysql-log", mysqlLogConfig);
+    pluginRegistry.createOrUpdate("cp", pluginConfig, false);
+
+  }
+
+  @Test
+  public void testWildcard() throws RpcException {
+    String sql = "SELECT * FROM cp.`regex/simple.log1`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("year", MinorType.INT)
+        .addNullable("month", MinorType.INT)
+        .addNullable("day", MinorType.INT)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow(2017, 12, 17)
+        .addRow(2017, 12, 18)
+        .addRow(2017, 12, 19)
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testExplicit() throws RpcException {
+    String sql = "SELECT `day`, `month` FROM cp.`regex/simple.log1`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("day", MinorType.INT)
+        .addNullable("month", MinorType.INT)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow(17, 12)
+        .addRow(18, 12)
+        .addRow(19, 12)
+        .build();
+
+//    results.print();
+//    expected.print();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testMissing() throws RpcException {
+    String sql = "SELECT `day`, `missing`, `month` FROM cp.`regex/simple.log1`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("day", MinorType.INT)
+        .addNullable("missing", MinorType.VARCHAR)
+        .addNullable("month", MinorType.INT)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow(17, null, 12)
+        .addRow(18, null, 12)
+        .addRow(19, null, 12)
+        .build();
+
+//    results.print();
+//    expected.print();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testRaw() throws RpcException {
+    String sql = "SELECT `_raw` FROM cp.`regex/simple.log1`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("_raw", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("2017-12-17 10:52:41,820 [main] INFO  o.a.d.e.e.f.FunctionImplementationRegistry - Function registry loaded.  459 functions loaded in 1396 ms.")
+        .addRow("2017-12-18 10:52:37,652 [main] INFO  o.a.drill.common.config.DrillConfig - Configuration and plugin file(s) identified in 115ms.")
+        .addRow("2017-12-19 11:12:27,278 [main] ERROR o.apache.drill.exec.server.Drillbit - Failure during initial startup of Drillbit.")
+        .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+
+  @Test
+  public void testDate() throws RpcException {
+    String sql = "SELECT TYPEOF(`entry_date`) AS entry_date FROM cp.`regex/simple.log2` LIMIT 1";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("entry_date", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("TIMESTAMP")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+
+  }
+
+  @Test
+  public void testCount() throws RpcException {
+    String sql = "SELECT COUNT(*) FROM cp.`regex/simple.log1`";
+    long result = client.queryBuilder().sql(sql).singletonLong();
+    assertEquals(3, result);
+  }
+
+  @Test
+  public void testFull() throws RpcException {
+    String sql = "SELECT * FROM cp.`regex/simple.log1`";
+    client.queryBuilder().sql(sql).printCsv();
+  }
+
+  //This section tests log queries without a defined schema
+  @Test
+  public void testStarQueryNoSchema() throws RpcException {
+    String sql = "SELECT * FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("field_0", MinorType.VARCHAR)
+        .addNullable("field_1", MinorType.VARCHAR)
+        .addNullable("field_2", MinorType.VARCHAR)
+        .addNullable("field_3", MinorType.VARCHAR)
+        .addNullable("field_4", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("070823", "21:00:32", "1", "Connect", "root@localhost on test1")
+        .addRow("070823", "21:00:48", "1", "Query", "show tables")
+        .addRow("070823", "21:00:56", "1", "Query", "select * from category" )
+        .addRow("070917", "16:29:01", "21", "Query","select * from location" )
+        .addRow("070917", "16:29:12", "21", "Query","select * from location where id = 1 LIMIT 1" )
+        .build();
+
+    //results.print();
+    //expected.print();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testAllFieldsQueryNoSchema() throws RpcException {
+    String sql = "SELECT field_0, field_1, field_2, field_3, field_4 FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("field_0", MinorType.VARCHAR)
+        .addNullable("field_1", MinorType.VARCHAR)
+        .addNullable("field_2", MinorType.VARCHAR)
+        .addNullable("field_3", MinorType.VARCHAR)
+        .addNullable("field_4", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("070823", "21:00:32", "1", "Connect", "root@localhost on test1")
+        .addRow("070823", "21:00:48", "1", "Query", "show tables")
+        .addRow("070823", "21:00:56", "1", "Query", "select * from category" )
+        .addRow("070917", "16:29:01", "21", "Query","select * from location" )
+        .addRow("070917", "16:29:12", "21", "Query","select * from location where id = 1 LIMIT 1" )
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSomeFieldsQueryNoSchema() throws RpcException {
+    String sql = "SELECT field_0, field_4 FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("field_0", MinorType.VARCHAR)
+        .addNullable("field_4", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("070823", "root@localhost on test1")
+        .addRow("070823",  "show tables")
+        .addRow("070823",  "select * from category" )
+        .addRow("070917",  "select * from location" )
+        .addRow("070917", "select * from location where id = 1 LIMIT 1" )
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testRawNoSchema() throws RpcException {
+    String sql = "SELECT _raw FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("_raw", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("070823 21:00:32       1 Connect     root@localhost on test1")
+        .addRow("070823 21:00:48       1 Query       show tables")
+        .addRow("070823 21:00:56       1 Query       select * from category" )
+        .addRow("070917 16:29:01      21 Query       select * from location" )
+        .addRow("070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1" )
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testUMNoSchema() throws RpcException {
+    String sql = "SELECT _unmatched_rows FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("_unmatched_rows", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testRawUMNoSchema() throws RpcException {
+    String sql = "SELECT _raw, _unmatched_rows FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("_raw", MinorType.VARCHAR)
+        .addNullable("_unmatched_rows", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("070823 21:00:32       1 Connect     root@localhost on test1", null)
+        .addRow("070823 21:00:48       1 Query       show tables", null)
+        .addRow("070823 21:00:56       1 Query       select * from category", null )
+        .addRow("070917 16:29:01      21 Query       select * from location", null )
+        .addRow("070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1", null )
+        .addRow( null, "dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/regex/baddates.log2 b/exec/java-exec/src/test/resources/regex/baddates.log2
new file mode 100644
index 00000000000..64b6b776537
--- /dev/null
+++ b/exec/java-exec/src/test/resources/regex/baddates.log2
@@ -0,0 +1,5 @@
+2017-14-17 10:52:41,820 [main] INFO  o.a.d.e.e.f.FunctionImplementationRegistry - Function registry loaded.  459 functions loaded in 1396 ms.
+2017-15-18 10:52:37,652 [main] INFO  o.a.drill.common.config.DrillConfig - Configuration and plugin file(s) identified in 115ms.
+Base Configuration:
+    - jar:file:/foo/apache-drill-1.13.0-SNAPSHOT/jars/drill-common-1.13.0-SNAPSHOT.jar!/drill-default.conf
+2017-16-19 11:12:27,278 [main] ERROR o.apache.drill.exec.server.Drillbit - Failure during initial startup of Drillbit.
diff --git a/exec/java-exec/src/test/resources/regex/mysql.sqllog b/exec/java-exec/src/test/resources/regex/mysql.sqllog
new file mode 100644
index 00000000000..3d1b1280254
--- /dev/null
+++ b/exec/java-exec/src/test/resources/regex/mysql.sqllog
@@ -0,0 +1,6 @@
+070823 21:00:32       1 Connect     root@localhost on test1
+070823 21:00:48       1 Query       show tables
+070823 21:00:56       1 Query       select * from category
+070917 16:29:01      21 Query       select * from location
+070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1
+dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/regex/mysql.sqllog2 b/exec/java-exec/src/test/resources/regex/mysql.sqllog2
new file mode 100644
index 00000000000..3d1b1280254
--- /dev/null
+++ b/exec/java-exec/src/test/resources/regex/mysql.sqllog2
@@ -0,0 +1,6 @@
+070823 21:00:32       1 Connect     root@localhost on test1
+070823 21:00:48       1 Query       show tables
+070823 21:00:56       1 Query       select * from category
+070917 16:29:01      21 Query       select * from location
+070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1
+dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/regex/simple.log1 b/exec/java-exec/src/test/resources/regex/simple.log1
new file mode 100644
index 00000000000..06df7bea9b2
--- /dev/null
+++ b/exec/java-exec/src/test/resources/regex/simple.log1
@@ -0,0 +1,5 @@
+2017-12-17 10:52:41,820 [main] INFO  o.a.d.e.e.f.FunctionImplementationRegistry - Function registry loaded.  459 functions loaded in 1396 ms.
+2017-12-18 10:52:37,652 [main] INFO  o.a.drill.common.config.DrillConfig - Configuration and plugin file(s) identified in 115ms.
+Base Configuration:
+    - jar:file:/foo/apache-drill-1.13.0-SNAPSHOT/jars/drill-common-1.13.0-SNAPSHOT.jar!/drill-default.conf
+2017-12-19 11:12:27,278 [main] ERROR o.apache.drill.exec.server.Drillbit - Failure during initial startup of Drillbit.
diff --git a/exec/java-exec/src/test/resources/regex/simple.log2 b/exec/java-exec/src/test/resources/regex/simple.log2
new file mode 100644
index 00000000000..06df7bea9b2
--- /dev/null
+++ b/exec/java-exec/src/test/resources/regex/simple.log2
@@ -0,0 +1,5 @@
+2017-12-17 10:52:41,820 [main] INFO  o.a.d.e.e.f.FunctionImplementationRegistry - Function registry loaded.  459 functions loaded in 1396 ms.
+2017-12-18 10:52:37,652 [main] INFO  o.a.drill.common.config.DrillConfig - Configuration and plugin file(s) identified in 115ms.
+Base Configuration:
+    - jar:file:/foo/apache-drill-1.13.0-SNAPSHOT/jars/drill-common-1.13.0-SNAPSHOT.jar!/drill-default.conf
+2017-12-19 11:12:27,278 [main] ERROR o.apache.drill.exec.server.Drillbit - Failure during initial startup of Drillbit.
diff --git a/pom.xml b/pom.xml
index 56582c31641..7cb7c1b28d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -322,6 +322,11 @@
         <configuration>
           <excludeSubprojects>false</excludeSubprojects>
           <excludes>
+            <!-- Types log1, log2, sqllog and sqllog2 are used to test he logRegex format plugin. -->
+            <exclude>**/*.log1</exclude>
+            <exclude>**/*.log2</exclude>
+            <exclude>**/*.sqllog</exclude>
+            <exclude>**/*.sqllog2</exclude>
             <exclude>**/*.log</exclude>
             <exclude>**/*.css</exclude>
             <exclude>**/*.js</exclude>
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 7162eadfeca..64703c33ee7 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -545,6 +545,10 @@ private FragmentState(int index, int value) {
      * <code>JDBC_SCAN = 44;</code>
      */
     JDBC_SCAN(44, 44),
+    /**
+     * <code>REGEX_SUB_SCAN = 45;</code>
+     */
+    REGEX_SUB_SCAN(45,45),
     ;
 
     /**
@@ -727,6 +731,11 @@ private FragmentState(int index, int value) {
      * <code>JDBC_SCAN = 44;</code>
      */
     public static final int JDBC_SCAN_VALUE = 44;
+    /**
+     * <code>REGEX_SUB_SCAN = 45;</code>
+     */
+    public static final int REGEX_SUB_SCAN_VALUE = 45;
+
 
 
     public final int getNumber() { return value; }
@@ -778,6 +787,7 @@ public static CoreOperatorType valueOf(int value) {
         case 42: return UNNEST;
         case 43: return HIVE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN;
         case 44: return JDBC_SCAN;
+        case 45: return REGEX_SUB_SCAN;
         default: return null;
       }
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services