You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2019/11/05 13:26:20 UTC

[GitHub] [drill] cgivre commented on a change in pull request #1778: Drill-7233: Format Plugin for HDF5

cgivre commented on a change in pull request #1778: Drill-7233: Format Plugin for HDF5
URL: https://github.com/apache/drill/pull/1778#discussion_r342560510
 
 

 ##########
 File path: contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
 ##########
 @@ -0,0 +1,887 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5;
+
+import ch.systemsx.cisd.hdf5.HDF5CompoundMemberInformation;
+import ch.systemsx.cisd.hdf5.HDF5DataClass;
+import ch.systemsx.cisd.hdf5.HDF5DataSetInformation;
+import ch.systemsx.cisd.hdf5.HDF5FactoryProvider;
+import ch.systemsx.cisd.hdf5.HDF5LinkInformation;
+import ch.systemsx.cisd.hdf5.IHDF5Factory;
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HDF5BatchReader.class);
+  private FileSplit split;
+  private HDF5FormatConfig formatConfig;
+  private ResultSetLoader loader;
+  private String tempFileName;
+  private IHDF5Reader HDF5reader;
+  private File infile;
+  private BufferedReader reader;
+  protected HDF5ReaderConfig readerConfig;
+  private boolean finish;
+
+
+  public static class HDF5ReaderConfig {
+    protected final HDF5FormatPlugin plugin;
+    protected TupleMetadata schema;
+    protected String defaultPath;
+    protected HDF5FormatConfig formatConfig;
+
+    public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig) {
+      this.plugin = plugin;
+      this.formatConfig = formatConfig;
+      this.defaultPath = formatConfig.getDefaultPath();
+    }
+  }
+
+
+  public HDF5BatchReader(HDF5ReaderConfig readerConfig) {
+    this.readerConfig = readerConfig;
+    this.formatConfig = readerConfig.formatConfig;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    loader = negotiator.build();
+    openFile(negotiator);
+    this.loader = negotiator.build();
+    return true;
+  }
+
+  private void openFile(FileSchemaNegotiator negotiator) {
+    InputStream in;
+    try {
+      in = negotiator.fileSystem().open(split.getPath());
+      IHDF5Factory factory = HDF5FactoryProvider.get();
+      this.infile = convertInputStreamToFile(in);
+      this.HDF5reader = factory.openForReading(infile);
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: %s", split.getPath())
+        .build(logger);
+    }
+    reader = new BufferedReader(new InputStreamReader(in));
+  }
+
+  /**
+   * This function converts the Drill inputstream into a File object for the HDF5 library.  This function
+   * exists due to a known limitation in the HDF5 library which cannot parse HDF5 directly from an input stream.  A future
+   * release of the library will support this.
+   *
+   * @param stream
+   * @return
+   * @throws IOException
+   */
+  private File convertInputStreamToFile(InputStream stream) throws IOException {
+    this.tempFileName = "./~" + split.getPath().getName();
+    File targetFile = new File(tempFileName);
+    java.nio.file.Files.copy(stream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+    IOUtils.closeQuietly(stream);
+    return targetFile;
+  }
+
+  @Override
+  public boolean next() {
+    RowSetLoader rowWriter = loader.writer();
+    while (!rowWriter.isFull()) {
+      if (this.formatConfig.getDefaultPath() == null || this.formatConfig.getDefaultPath().isEmpty()) {
+        return projectFileMetadata(rowWriter);
 
 Review comment:
   I completely rewrote this section.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services