You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/11/04 14:51:56 UTC

[GitHub] [drill] dzamo commented on a change in pull request #2282: DRILL-7978: Fixed Width Format Plugin

dzamo commented on a change in pull request #2282:
URL: https://github.com/apache/drill/pull/2282#discussion_r742913804



##########
File path: contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.fixedwidth;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+
+public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);
+  private FileSplit split;
+  private final int maxRecords;
+  private final FixedwidthFormatConfig config;
+  private CustomErrorContext errorContext;
+  private InputStream fsStream;
+  private ResultSetLoader loader;
+  private RowSetLoader writer;
+  private BufferedReader reader;
+  private int lineNum;
+
+  public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
+    this.config = config;
+    this.maxRecords = maxRecords;
+  }
+
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    errorContext = negotiator.parentErrorContext();
+    lineNum = 0;
+    try {
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      negotiator.tableSchema(buildSchema(), true);
+      loader = negotiator.build();
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+    reader = new BufferedReader(new InputStreamReader(fsStream, Charsets.UTF_8));
+    return true;
+  }
+
+  @Override
+  public boolean next() { // Use loader to read data from file to turn into Drill rows
+    String line;
+    RowSetLoader writer = loader.writer();
+
+    try {
+      line = reader.readLine();
+      while (!writer.isFull() && line != null) {
+        writer.start();
+        parseLine(line, writer);
+        writer.save();
+        line = reader.readLine();
+        lineNum++;
+      }
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to read input file: {}", split.getPath().toString())
+        .addContext(errorContext)
+        .addContext(e.getMessage())
+        .addContext("Line Number", lineNum)
+        .build(logger);
+    }
+    return writer.limitReached(maxRecords);  // returns false when maxRecords limit has been reached
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null){
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+  }
+
+  private TupleMetadata buildSchema() {
+    SchemaBuilder builder = new SchemaBuilder();
+    for (FixedwidthFieldConfig field : config.getFields()) {
+      if (field.getType() == TypeProtos.MinorType.VARDECIMAL){
+        builder.addNullable(field.getName(), TypeProtos.MinorType.VARDECIMAL,38,4);
+        //revisit this
+      } else {
+        builder.addNullable(field.getName(), field.getType());
+      }
+    }
+    return builder.buildSchema();
+  }
+
+
+  private boolean parseLine(String line, RowSetLoader writer) throws IOException {
+    int i = 0;
+    TypeProtos.MinorType dataType;
+    String dateTimeFormat;
+    String value;
+    for (FixedwidthFieldConfig field : config.getFields()) {

Review comment:
       @cgivre @MFoss19 @estherbuchwalter here we are reading column data types from the format config, where we also specify their start and stop offsets.  But this format plugin can also accept data types from a provided schema.  So my question is: can we drop the data type information from the format config so that we don't introduce multiple ad-hoc ways of specifying this info?  This is genuinely a question because I don't know this subject well...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org